aboutsummaryrefslogtreecommitdiff
path: root/std/event.zig
diff options
context:
space:
mode:
authorAndrew Kelley <superjoe30@gmail.com>2018-06-29 15:39:55 -0400
committerAndrew Kelley <superjoe30@gmail.com>2018-07-02 14:38:11 -0400
commita3f55aaf34f0a459c8aec4b35e55ad4534eaca30 (patch)
tree5799189e210d53271de654a2f713e7d5f9056fed /std/event.zig
parent2759c7951da050d825cf765c4b660f5562fb01a4 (diff)
downloadzig-a3f55aaf34f0a459c8aec4b35e55ad4534eaca30.tar.gz
zig-a3f55aaf34f0a459c8aec4b35e55ad4534eaca30.zip
add event loop Channel abstraction
This is akin to channels in Go, except: * implemented in userland * they are lock-free and thread-safe * they integrate with the userland event loop The self hosted compiler is changed to use a channel for events, and made to stay alive, watching files and performing builds when things change, however the main.zig file exits after 1 build. Note that nothing is actually built yet, it just parses the input and then declares that the build succeeded. Next items to do: * add windows and macos support for std.event.Loop * improve the event loop stop() operation * make the event loop multiplex coroutines onto kernel threads * watch source file for updates, and provide AST diffs (at least list the top level declaration changes) * top level declaration analysis
Diffstat (limited to 'std/event.zig')
-rw-r--r--std/event.zig279
1 files changed, 277 insertions, 2 deletions
diff --git a/std/event.zig b/std/event.zig
index 0821c789b7..7f823bc732 100644
--- a/std/event.zig
+++ b/std/event.zig
@@ -4,6 +4,8 @@ const assert = std.debug.assert;
const event = this;
const mem = std.mem;
const posix = std.os.posix;
+const AtomicRmwOp = builtin.AtomicRmwOp;
+const AtomicOrder = builtin.AtomicOrder;
pub const TcpServer = struct {
handleRequestFn: async<*mem.Allocator> fn (*TcpServer, *const std.net.Address, *const std.os.File) void,
@@ -95,16 +97,29 @@ pub const Loop = struct {
allocator: *mem.Allocator,
epollfd: i32,
keep_running: bool,
+ next_tick_queue: std.atomic.QueueMpsc(promise),
- fn init(allocator: *mem.Allocator) !Loop {
+ pub const NextTickNode = std.atomic.QueueMpsc(promise).Node;
+
+ /// The allocator must be thread-safe because we use it for multiplexing
+ /// coroutines onto kernel threads.
+ pub fn init(allocator: *mem.Allocator) !Loop {
const epollfd = try std.os.linuxEpollCreate(std.os.linux.EPOLL_CLOEXEC);
+ errdefer std.os.close(epollfd);
+
return Loop{
.keep_running = true,
.allocator = allocator,
.epollfd = epollfd,
+ .next_tick_queue = std.atomic.QueueMpsc(promise).init(),
};
}
+ /// must call stop before deinit
+ pub fn deinit(self: *Loop) void {
+ std.os.close(self.epollfd);
+ }
+
pub fn addFd(self: *Loop, fd: i32, prom: promise) !void {
var ev = std.os.linux.epoll_event{
.events = std.os.linux.EPOLLIN | std.os.linux.EPOLLOUT | std.os.linux.EPOLLET,
@@ -126,11 +141,21 @@ pub const Loop = struct {
pub fn stop(self: *Loop) void {
// TODO make atomic
self.keep_running = false;
- // TODO activate an fd in the epoll set
+ // TODO activate an fd in the epoll set which should cancel all the promises
+ }
+
+ /// bring your own linked list node. this means it can't fail.
+ pub fn onNextTick(self: *Loop, node: *NextTickNode) void {
+ self.next_tick_queue.put(node);
}
pub fn run(self: *Loop) void {
while (self.keep_running) {
+ // TODO multiplex the next tick queue and the epoll event results onto a thread pool
+ while (self.next_tick_queue.get()) |node| {
+ resume node.data;
+ }
+ if (!self.keep_running) break;
var events: [16]std.os.linux.epoll_event = undefined;
const count = std.os.linuxEpollWait(self.epollfd, events[0..], -1);
for (events[0..count]) |ev| {
@@ -141,6 +166,215 @@ pub const Loop = struct {
}
};
+/// many producer, many consumer, thread-safe, lock-free, runtime configurable buffer size
+/// when buffer is empty, consumers suspend and are resumed by producers
+/// when buffer is full, producers suspend and are resumed by consumers
+pub fn Channel(comptime T: type) type {
+ return struct {
+ loop: *Loop,
+
+ getters: std.atomic.QueueMpsc(GetNode),
+ putters: std.atomic.QueueMpsc(PutNode),
+ get_count: usize,
+ put_count: usize,
+ dispatch_lock: u8, // TODO make this a bool
+ need_dispatch: u8, // TODO make this a bool
+
+ // simple fixed size ring buffer
+ buffer_nodes: []T,
+ buffer_index: usize,
+ buffer_len: usize,
+
+ const SelfChannel = this;
+ const GetNode = struct {
+ ptr: *T,
+ tick_node: *Loop.NextTickNode,
+ };
+ const PutNode = struct {
+ data: T,
+ tick_node: *Loop.NextTickNode,
+ };
+
+ /// call destroy when done
+ pub fn create(loop: *Loop, capacity: usize) !*SelfChannel {
+ const buffer_nodes = try loop.allocator.alloc(T, capacity);
+ errdefer loop.allocator.free(buffer_nodes);
+
+ const self = try loop.allocator.create(SelfChannel{
+ .loop = loop,
+ .buffer_len = 0,
+ .buffer_nodes = buffer_nodes,
+ .buffer_index = 0,
+ .dispatch_lock = 0,
+ .need_dispatch = 0,
+ .getters = std.atomic.QueueMpsc(GetNode).init(),
+ .putters = std.atomic.QueueMpsc(PutNode).init(),
+ .get_count = 0,
+ .put_count = 0,
+ });
+ errdefer loop.allocator.destroy(self);
+
+ return self;
+ }
+
+ /// must be called when all calls to put and get have suspended and no more calls occur
+ pub fn destroy(self: *SelfChannel) void {
+ while (self.getters.get()) |get_node| {
+ cancel get_node.data.tick_node.data;
+ }
+ while (self.putters.get()) |put_node| {
+ cancel put_node.data.tick_node.data;
+ }
+ self.loop.allocator.free(self.buffer_nodes);
+ self.loop.allocator.destroy(self);
+ }
+
+ /// puts a data item in the channel. The promise completes when the value has been added to the
+ /// buffer, or in the case of a zero size buffer, when the item has been retrieved by a getter.
+ pub async fn put(self: *SelfChannel, data: T) void {
+ // TODO should be able to group memory allocation failure before first suspend point
+ // so that the async invocation catches it
+ var dispatch_tick_node_ptr: *Loop.NextTickNode = undefined;
+ _ = async self.dispatch(&dispatch_tick_node_ptr) catch unreachable;
+
+ suspend |handle| {
+ var my_tick_node = Loop.NextTickNode{
+ .next = undefined,
+ .data = handle,
+ };
+ var queue_node = std.atomic.QueueMpsc(PutNode).Node{
+ .data = PutNode{
+ .tick_node = &my_tick_node,
+ .data = data,
+ },
+ .next = undefined,
+ };
+ self.putters.put(&queue_node);
+ _ = @atomicRmw(usize, &self.put_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst);
+
+ self.loop.onNextTick(dispatch_tick_node_ptr);
+ }
+ }
+
+ /// await this function to get an item from the channel. If the buffer is empty, the promise will
+ /// complete when the next item is put in the channel.
+ pub async fn get(self: *SelfChannel) T {
+ // TODO should be able to group memory allocation failure before first suspend point
+ // so that the async invocation catches it
+ var dispatch_tick_node_ptr: *Loop.NextTickNode = undefined;
+ _ = async self.dispatch(&dispatch_tick_node_ptr) catch unreachable;
+
+ // TODO integrate this function with named return values
+ // so we can get rid of this extra result copy
+ var result: T = undefined;
+ var debug_handle: usize = undefined;
+ suspend |handle| {
+ debug_handle = @ptrToInt(handle);
+ var my_tick_node = Loop.NextTickNode{
+ .next = undefined,
+ .data = handle,
+ };
+ var queue_node = std.atomic.QueueMpsc(GetNode).Node{
+ .data = GetNode{
+ .ptr = &result,
+ .tick_node = &my_tick_node,
+ },
+ .next = undefined,
+ };
+ self.getters.put(&queue_node);
+ _ = @atomicRmw(usize, &self.get_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst);
+
+ self.loop.onNextTick(dispatch_tick_node_ptr);
+ }
+ return result;
+ }
+
+ async fn dispatch(self: *SelfChannel, tick_node_ptr: **Loop.NextTickNode) void {
+ // resumed by onNextTick
+ suspend |handle| {
+ var tick_node = Loop.NextTickNode{
+ .data = handle,
+ .next = undefined,
+ };
+ tick_node_ptr.* = &tick_node;
+ }
+
+ // set the "need dispatch" flag
+ _ = @atomicRmw(u8, &self.need_dispatch, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst);
+
+ lock: while (true) {
+ // set the lock flag
+ const prev_lock = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst);
+ if (prev_lock != 0) return;
+
+ // clear the need_dispatch flag since we're about to do it
+ _ = @atomicRmw(u8, &self.need_dispatch, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
+
+ while (true) {
+ one_dispatch: {
+ // later we correct these extra subtractions
+ var get_count = @atomicRmw(usize, &self.get_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
+ var put_count = @atomicRmw(usize, &self.put_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
+
+ // transfer self.buffer to self.getters
+ while (self.buffer_len != 0) {
+ if (get_count == 0) break :one_dispatch;
+
+ const get_node = &self.getters.get().?.data;
+ get_node.ptr.* = self.buffer_nodes[self.buffer_index -% self.buffer_len];
+ self.loop.onNextTick(get_node.tick_node);
+ self.buffer_len -= 1;
+
+ get_count = @atomicRmw(usize, &self.get_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
+ }
+
+ // direct transfer self.putters to self.getters
+ while (get_count != 0 and put_count != 0) {
+ const get_node = &self.getters.get().?.data;
+ const put_node = &self.putters.get().?.data;
+
+ get_node.ptr.* = put_node.data;
+ self.loop.onNextTick(get_node.tick_node);
+ self.loop.onNextTick(put_node.tick_node);
+
+ get_count = @atomicRmw(usize, &self.get_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
+ put_count = @atomicRmw(usize, &self.put_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
+ }
+
+ // transfer self.putters to self.buffer
+ while (self.buffer_len != self.buffer_nodes.len and put_count != 0) {
+ const put_node = &self.putters.get().?.data;
+
+ self.buffer_nodes[self.buffer_index] = put_node.data;
+ self.loop.onNextTick(put_node.tick_node);
+ self.buffer_index +%= 1;
+ self.buffer_len += 1;
+
+ put_count = @atomicRmw(usize, &self.put_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
+ }
+ }
+
+ // undo the extra subtractions
+ _ = @atomicRmw(usize, &self.get_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst);
+ _ = @atomicRmw(usize, &self.put_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst);
+
+ // clear need-dispatch flag
+ const need_dispatch = @atomicRmw(u8, &self.need_dispatch, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
+ if (need_dispatch != 0) continue;
+
+ const my_lock = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
+ assert(my_lock != 0);
+
+ // we have to check again now that we unlocked
+ if (@atomicLoad(u8, &self.need_dispatch, AtomicOrder.SeqCst) != 0) continue :lock;
+
+ return;
+ }
+ }
+ }
+ };
+}
+
pub async fn connect(loop: *Loop, _address: *const std.net.Address) !std.os.File {
var address = _address.*; // TODO https://github.com/ziglang/zig/issues/733
@@ -199,6 +433,7 @@ test "listen on a port, send bytes, receive bytes" {
defer cancel p;
loop.run();
}
+
async fn doAsyncTest(loop: *Loop, address: *const std.net.Address) void {
errdefer @panic("test failure");
@@ -211,3 +446,43 @@ async fn doAsyncTest(loop: *Loop, address: *const std.net.Address) void {
assert(mem.eql(u8, msg, "hello from server\n"));
loop.stop();
}
+
+test "std.event.Channel" {
+ var da = std.heap.DirectAllocator.init();
+ defer da.deinit();
+
+ const allocator = &da.allocator;
+
+ var loop = try Loop.init(allocator);
+ defer loop.deinit();
+
+ const channel = try Channel(i32).create(&loop, 0);
+ defer channel.destroy();
+
+ const handle = try async<allocator> testChannelGetter(&loop, channel);
+ defer cancel handle;
+
+ const putter = try async<allocator> testChannelPutter(channel);
+ defer cancel putter;
+
+ loop.run();
+}
+
+async fn testChannelGetter(loop: *Loop, channel: *Channel(i32)) void {
+ errdefer @panic("test failed");
+
+ const value1_promise = try async channel.get();
+ const value1 = await value1_promise;
+ assert(value1 == 1234);
+
+ const value2_promise = try async channel.get();
+ const value2 = await value2_promise;
+ assert(value2 == 4567);
+
+ loop.stop();
+}
+
+async fn testChannelPutter(channel: *Channel(i32)) void {
+ await (async channel.put(1234) catch @panic("out of memory"));
+ await (async channel.put(4567) catch @panic("out of memory"));
+}