aboutsummaryrefslogtreecommitdiff
path: root/std/atomic/queue_mpsc.zig
diff options
context:
space:
mode:
authorAndrew Kelley <superjoe30@gmail.com>2018-07-09 22:06:47 -0400
committerGitHub <noreply@github.com>2018-07-09 22:06:47 -0400
commitccef60a64033a25dbe2351c27f28257546b2ae5b (patch)
tree67390c7e43f9852cf3786f2eed35ebf04e15510d /std/atomic/queue_mpsc.zig
parent10cc49db1ca1f9b3ac63277c0742e05f6412f3c6 (diff)
parentc89aac85c440ea4cbccf1abdbd6acf84a33077e3 (diff)
downloadzig-ccef60a64033a25dbe2351c27f28257546b2ae5b.tar.gz
zig-ccef60a64033a25dbe2351c27f28257546b2ae5b.zip
Merge pull request #1198 from ziglang/m-n-threading
M:N threading
Diffstat (limited to 'std/atomic/queue_mpsc.zig')
-rw-r--r--std/atomic/queue_mpsc.zig42
1 files changed, 42 insertions, 0 deletions
diff --git a/std/atomic/queue_mpsc.zig b/std/atomic/queue_mpsc.zig
index 8030565d7a..978e189453 100644
--- a/std/atomic/queue_mpsc.zig
+++ b/std/atomic/queue_mpsc.zig
@@ -15,6 +15,8 @@ pub fn QueueMpsc(comptime T: type) type {
pub const Node = std.atomic.Stack(T).Node;
+ /// Not thread-safe. The call to init() must complete before any other functions are called.
+ /// No deinitialization required.
pub fn init() Self {
return Self{
.inboxes = []std.atomic.Stack(T){
@@ -26,12 +28,15 @@ pub fn QueueMpsc(comptime T: type) type {
};
}
+ /// Fully thread-safe. put() may be called from any thread at any time.
pub fn put(self: *Self, node: *Node) void {
const inbox_index = @atomicLoad(usize, &self.inbox_index, AtomicOrder.SeqCst);
const inbox = &self.inboxes[inbox_index];
inbox.push(node);
}
+ /// Must be called by only 1 consumer at a time. Every call to get() and isEmpty() must complete before
+ /// the next call to get().
pub fn get(self: *Self) ?*Node {
if (self.outbox.pop()) |node| {
return node;
@@ -43,6 +48,43 @@ pub fn QueueMpsc(comptime T: type) type {
}
return self.outbox.pop();
}
+
+ /// Must be called by only 1 consumer at a time. Every call to get() and isEmpty() must complete before
+ /// the next call to isEmpty().
+ pub fn isEmpty(self: *Self) bool {
+ if (!self.outbox.isEmpty()) return false;
+ const prev_inbox_index = @atomicRmw(usize, &self.inbox_index, AtomicRmwOp.Xor, 0x1, AtomicOrder.SeqCst);
+ const prev_inbox = &self.inboxes[prev_inbox_index];
+ while (prev_inbox.pop()) |node| {
+ self.outbox.push(node);
+ }
+ return self.outbox.isEmpty();
+ }
+
+ /// For debugging only. No API guarantees about what this does.
+ pub fn dump(self: *Self) void {
+ {
+ var it = self.outbox.root;
+ while (it) |node| {
+ std.debug.warn("0x{x} -> ", @ptrToInt(node));
+ it = node.next;
+ }
+ }
+ const inbox_index = self.inbox_index;
+ const inboxes = []*std.atomic.Stack(T){
+ &self.inboxes[self.inbox_index],
+ &self.inboxes[1 - self.inbox_index],
+ };
+ for (inboxes) |inbox| {
+ var it = inbox.root;
+ while (it) |node| {
+ std.debug.warn("0x{x} -> ", @ptrToInt(node));
+ it = node.next;
+ }
+ }
+
+ std.debug.warn("null\n");
+ }
};
}