diff options
| author | Andrew Kelley <superjoe30@gmail.com> | 2018-07-09 22:06:47 -0400 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2018-07-09 22:06:47 -0400 |
| commit | ccef60a64033a25dbe2351c27f28257546b2ae5b (patch) | |
| tree | 67390c7e43f9852cf3786f2eed35ebf04e15510d /std/atomic | |
| parent | 10cc49db1ca1f9b3ac63277c0742e05f6412f3c6 (diff) | |
| parent | c89aac85c440ea4cbccf1abdbd6acf84a33077e3 (diff) | |
| download | zig-ccef60a64033a25dbe2351c27f28257546b2ae5b.tar.gz zig-ccef60a64033a25dbe2351c27f28257546b2ae5b.zip | |
Merge pull request #1198 from ziglang/m-n-threading
M:N threading
Diffstat (limited to 'std/atomic')
| -rw-r--r-- | std/atomic/queue_mpsc.zig | 42 |
1 files changed, 42 insertions, 0 deletions
diff --git a/std/atomic/queue_mpsc.zig b/std/atomic/queue_mpsc.zig index 8030565d7a..978e189453 100644 --- a/std/atomic/queue_mpsc.zig +++ b/std/atomic/queue_mpsc.zig @@ -15,6 +15,8 @@ pub fn QueueMpsc(comptime T: type) type { pub const Node = std.atomic.Stack(T).Node; + /// Not thread-safe. The call to init() must complete before any other functions are called. + /// No deinitialization required. pub fn init() Self { return Self{ .inboxes = []std.atomic.Stack(T){ @@ -26,12 +28,15 @@ pub fn QueueMpsc(comptime T: type) type { }; } + /// Fully thread-safe. put() may be called from any thread at any time. pub fn put(self: *Self, node: *Node) void { const inbox_index = @atomicLoad(usize, &self.inbox_index, AtomicOrder.SeqCst); const inbox = &self.inboxes[inbox_index]; inbox.push(node); } + /// Must be called by only 1 consumer at a time. Every call to get() and isEmpty() must complete before + /// the next call to get(). pub fn get(self: *Self) ?*Node { if (self.outbox.pop()) |node| { return node; @@ -43,6 +48,43 @@ pub fn QueueMpsc(comptime T: type) type { } return self.outbox.pop(); } + + /// Must be called by only 1 consumer at a time. Every call to get() and isEmpty() must complete before + /// the next call to isEmpty(). + pub fn isEmpty(self: *Self) bool { + if (!self.outbox.isEmpty()) return false; + const prev_inbox_index = @atomicRmw(usize, &self.inbox_index, AtomicRmwOp.Xor, 0x1, AtomicOrder.SeqCst); + const prev_inbox = &self.inboxes[prev_inbox_index]; + while (prev_inbox.pop()) |node| { + self.outbox.push(node); + } + return self.outbox.isEmpty(); + } + + /// For debugging only. No API guarantees about what this does. + pub fn dump(self: *Self) void { + { + var it = self.outbox.root; + while (it) |node| { + std.debug.warn("0x{x} -> ", @ptrToInt(node)); + it = node.next; + } + } + const inbox_index = self.inbox_index; + const inboxes = []*std.atomic.Stack(T){ + &self.inboxes[self.inbox_index], + &self.inboxes[1 - self.inbox_index], + }; + for (inboxes) |inbox| { + var it = inbox.root; + while (it) |node| { + std.debug.warn("0x{x} -> ", @ptrToInt(node)); + it = node.next; + } + } + + std.debug.warn("null\n"); + } }; } |
