diff options
| author | Andrew Kelley <superjoe30@gmail.com> | 2018-07-10 14:03:03 -0400 |
|---|---|---|
| committer | Andrew Kelley <superjoe30@gmail.com> | 2018-07-10 14:03:03 -0400 |
| commit | cfaebb20d8906d31cedc59e39e6a9286967a931a (patch) | |
| tree | 33d0c3994bfb658937e2e692a946bc8a2eca4fe5 /std/atomic/queue_mpsc.zig | |
| parent | b5d07297dec61a3993dfe91ceee2c87672db1e8e (diff) | |
| parent | 0ce6934e2631eb3beca817d3bce12ecb13aafa13 (diff) | |
| download | zig-cfaebb20d8906d31cedc59e39e6a9286967a931a.tar.gz zig-cfaebb20d8906d31cedc59e39e6a9286967a931a.zip | |
Merge remote-tracking branch 'origin/master' into llvm7
Diffstat (limited to 'std/atomic/queue_mpsc.zig')
| -rw-r--r-- | std/atomic/queue_mpsc.zig | 42 |
1 files changed, 42 insertions, 0 deletions
diff --git a/std/atomic/queue_mpsc.zig b/std/atomic/queue_mpsc.zig index 8030565d7a..978e189453 100644 --- a/std/atomic/queue_mpsc.zig +++ b/std/atomic/queue_mpsc.zig @@ -15,6 +15,8 @@ pub fn QueueMpsc(comptime T: type) type { pub const Node = std.atomic.Stack(T).Node; + /// Not thread-safe. The call to init() must complete before any other functions are called. + /// No deinitialization required. pub fn init() Self { return Self{ .inboxes = []std.atomic.Stack(T){ @@ -26,12 +28,15 @@ pub fn QueueMpsc(comptime T: type) type { }; } + /// Fully thread-safe. put() may be called from any thread at any time. pub fn put(self: *Self, node: *Node) void { const inbox_index = @atomicLoad(usize, &self.inbox_index, AtomicOrder.SeqCst); const inbox = &self.inboxes[inbox_index]; inbox.push(node); } + /// Must be called by only 1 consumer at a time. Every call to get() and isEmpty() must complete before + /// the next call to get(). pub fn get(self: *Self) ?*Node { if (self.outbox.pop()) |node| { return node; @@ -43,6 +48,43 @@ pub fn QueueMpsc(comptime T: type) type { } return self.outbox.pop(); } + + /// Must be called by only 1 consumer at a time. Every call to get() and isEmpty() must complete before + /// the next call to isEmpty(). + pub fn isEmpty(self: *Self) bool { + if (!self.outbox.isEmpty()) return false; + const prev_inbox_index = @atomicRmw(usize, &self.inbox_index, AtomicRmwOp.Xor, 0x1, AtomicOrder.SeqCst); + const prev_inbox = &self.inboxes[prev_inbox_index]; + while (prev_inbox.pop()) |node| { + self.outbox.push(node); + } + return self.outbox.isEmpty(); + } + + /// For debugging only. No API guarantees about what this does. + pub fn dump(self: *Self) void { + { + var it = self.outbox.root; + while (it) |node| { + std.debug.warn("0x{x} -> ", @ptrToInt(node)); + it = node.next; + } + } + const inbox_index = self.inbox_index; + const inboxes = []*std.atomic.Stack(T){ + &self.inboxes[self.inbox_index], + &self.inboxes[1 - self.inbox_index], + }; + for (inboxes) |inbox| { + var it = inbox.root; + while (it) |node| { + std.debug.warn("0x{x} -> ", @ptrToInt(node)); + it = node.next; + } + } + + std.debug.warn("null\n"); + } }; } |
