diff options
Diffstat (limited to 'std/atomic/queue_mpsc.zig')
| -rw-r--r-- | std/atomic/queue_mpsc.zig | 42 |
1 files changed, 42 insertions, 0 deletions
diff --git a/std/atomic/queue_mpsc.zig b/std/atomic/queue_mpsc.zig index 8030565d7a..978e189453 100644 --- a/std/atomic/queue_mpsc.zig +++ b/std/atomic/queue_mpsc.zig @@ -15,6 +15,8 @@ pub fn QueueMpsc(comptime T: type) type { pub const Node = std.atomic.Stack(T).Node; + /// Not thread-safe. The call to init() must complete before any other functions are called. + /// No deinitialization required. pub fn init() Self { return Self{ .inboxes = []std.atomic.Stack(T){ @@ -26,12 +28,15 @@ pub fn QueueMpsc(comptime T: type) type { }; } + /// Fully thread-safe. put() may be called from any thread at any time. pub fn put(self: *Self, node: *Node) void { const inbox_index = @atomicLoad(usize, &self.inbox_index, AtomicOrder.SeqCst); const inbox = &self.inboxes[inbox_index]; inbox.push(node); } + /// Must be called by only 1 consumer at a time. Every call to get() and isEmpty() must complete before + /// the next call to get(). pub fn get(self: *Self) ?*Node { if (self.outbox.pop()) |node| { return node; @@ -43,6 +48,43 @@ pub fn QueueMpsc(comptime T: type) type { } return self.outbox.pop(); } + + /// Must be called by only 1 consumer at a time. Every call to get() and isEmpty() must complete before + /// the next call to isEmpty(). + pub fn isEmpty(self: *Self) bool { + if (!self.outbox.isEmpty()) return false; + const prev_inbox_index = @atomicRmw(usize, &self.inbox_index, AtomicRmwOp.Xor, 0x1, AtomicOrder.SeqCst); + const prev_inbox = &self.inboxes[prev_inbox_index]; + while (prev_inbox.pop()) |node| { + self.outbox.push(node); + } + return self.outbox.isEmpty(); + } + + /// For debugging only. No API guarantees about what this does. + pub fn dump(self: *Self) void { + { + var it = self.outbox.root; + while (it) |node| { + std.debug.warn("0x{x} -> ", @ptrToInt(node)); + it = node.next; + } + } + const inbox_index = self.inbox_index; + const inboxes = []*std.atomic.Stack(T){ + &self.inboxes[self.inbox_index], + &self.inboxes[1 - self.inbox_index], + }; + for (inboxes) |inbox| { + var it = inbox.root; + while (it) |node| { + std.debug.warn("0x{x} -> ", @ptrToInt(node)); + it = node.next; + } + } + + std.debug.warn("null\n"); + } }; } |
