aboutsummaryrefslogtreecommitdiff
path: root/std/atomic
diff options
context:
space:
mode:
Diffstat (limited to 'std/atomic')
-rw-r--r--std/atomic/queue.zig84
1 files changed, 60 insertions, 24 deletions
diff --git a/std/atomic/queue.zig b/std/atomic/queue.zig
index df31c88d2a..6948af43ba 100644
--- a/std/atomic/queue.zig
+++ b/std/atomic/queue.zig
@@ -1,40 +1,38 @@
+const std = @import("../index.zig");
const builtin = @import("builtin");
const AtomicOrder = builtin.AtomicOrder;
const AtomicRmwOp = builtin.AtomicRmwOp;
+const assert = std.debug.assert;
/// Many producer, many consumer, non-allocating, thread-safe.
-/// Uses a spinlock to protect get() and put().
+/// Uses a mutex to protect access.
pub fn Queue(comptime T: type) type {
return struct {
head: ?*Node,
tail: ?*Node,
- lock: u8,
+ mutex: std.Mutex,
pub const Self = this;
-
- pub const Node = struct {
- next: ?*Node,
- data: T,
- };
+ pub const Node = std.LinkedList(T).Node;
pub fn init() Self {
return Self{
.head = null,
.tail = null,
- .lock = 0,
+ .mutex = std.Mutex.init(),
};
}
pub fn put(self: *Self, node: *Node) void {
node.next = null;
- while (@atomicRmw(u8, &self.lock, builtin.AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst) != 0) {}
- defer assert(@atomicRmw(u8, &self.lock, builtin.AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst) == 1);
+ const held = self.mutex.acquire();
+ defer held.release();
- const opt_tail = self.tail;
+ node.prev = self.tail;
self.tail = node;
- if (opt_tail) |tail| {
- tail.next = node;
+ if (node.prev) |prev_tail| {
+ prev_tail.next = node;
} else {
assert(self.head == null);
self.head = node;
@@ -42,18 +40,27 @@ pub fn Queue(comptime T: type) type {
}
pub fn get(self: *Self) ?*Node {
- while (@atomicRmw(u8, &self.lock, builtin.AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst) != 0) {}
- defer assert(@atomicRmw(u8, &self.lock, builtin.AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst) == 1);
+ const held = self.mutex.acquire();
+ defer held.release();
const head = self.head orelse return null;
self.head = head.next;
- if (head.next == null) self.tail = null;
+ if (head.next) |new_head| {
+ new_head.prev = null;
+ } else {
+ self.tail = null;
+ }
+ // This way, a get() and a remove() are thread-safe with each other.
+ head.prev = null;
+ head.next = null;
return head;
}
pub fn unget(self: *Self, node: *Node) void {
- while (@atomicRmw(u8, &self.lock, builtin.AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst) != 0) {}
- defer assert(@atomicRmw(u8, &self.lock, builtin.AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst) == 1);
+ node.prev = null;
+
+ const held = self.mutex.acquire();
+ defer held.release();
const opt_head = self.head;
self.head = node;
@@ -65,13 +72,39 @@ pub fn Queue(comptime T: type) type {
}
}
+ /// Thread-safe with get() and remove(). Returns whether node was actually removed.
+ pub fn remove(self: *Self, node: *Node) bool {
+ const held = self.mutex.acquire();
+ defer held.release();
+
+ if (node.prev == null and node.next == null and self.head != node) {
+ return false;
+ }
+
+ if (node.prev) |prev| {
+ prev.next = node.next;
+ } else {
+ self.head = node.next;
+ }
+ if (node.next) |next| {
+ next.prev = node.prev;
+ } else {
+ self.tail = node.prev;
+ }
+ node.prev = null;
+ node.next = null;
+ return true;
+ }
+
pub fn isEmpty(self: *Self) bool {
- return @atomicLoad(?*Node, &self.head, builtin.AtomicOrder.SeqCst) != null;
+ const held = self.mutex.acquire();
+ defer held.release();
+ return self.head != null;
}
pub fn dump(self: *Self) void {
- while (@atomicRmw(u8, &self.lock, builtin.AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst) != 0) {}
- defer assert(@atomicRmw(u8, &self.lock, builtin.AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst) == 1);
+ const held = self.mutex.acquire();
+ defer held.release();
std.debug.warn("head: ");
dumpRecursive(self.head, 0);
@@ -93,9 +126,6 @@ pub fn Queue(comptime T: type) type {
};
}
-const std = @import("../index.zig");
-const assert = std.debug.assert;
-
const Context = struct {
allocator: *std.mem.Allocator,
queue: *Queue(i32),
@@ -169,6 +199,7 @@ fn startPuts(ctx: *Context) u8 {
std.os.time.sleep(0, 1); // let the os scheduler be our fuzz
const x = @bitCast(i32, r.random.scalar(u32));
const node = ctx.allocator.create(Queue(i32).Node{
+ .prev = undefined,
.next = undefined,
.data = x,
}) catch unreachable;
@@ -198,12 +229,14 @@ test "std.atomic.Queue single-threaded" {
var node_0 = Queue(i32).Node{
.data = 0,
.next = undefined,
+ .prev = undefined,
};
queue.put(&node_0);
var node_1 = Queue(i32).Node{
.data = 1,
.next = undefined,
+ .prev = undefined,
};
queue.put(&node_1);
@@ -212,12 +245,14 @@ test "std.atomic.Queue single-threaded" {
var node_2 = Queue(i32).Node{
.data = 2,
.next = undefined,
+ .prev = undefined,
};
queue.put(&node_2);
var node_3 = Queue(i32).Node{
.data = 3,
.next = undefined,
+ .prev = undefined,
};
queue.put(&node_3);
@@ -228,6 +263,7 @@ test "std.atomic.Queue single-threaded" {
var node_4 = Queue(i32).Node{
.data = 4,
.next = undefined,
+ .prev = undefined,
};
queue.put(&node_4);