From 821805aa92898cfdb770b87ac916e45e428621b8 Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Thu, 2 Aug 2018 17:04:17 -0400 Subject: WIP: Channel.getOrNull --- std/atomic/queue.zig | 84 +++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 60 insertions(+), 24 deletions(-) (limited to 'std/atomic/queue.zig') diff --git a/std/atomic/queue.zig b/std/atomic/queue.zig index df31c88d2a..6948af43ba 100644 --- a/std/atomic/queue.zig +++ b/std/atomic/queue.zig @@ -1,40 +1,38 @@ +const std = @import("../index.zig"); const builtin = @import("builtin"); const AtomicOrder = builtin.AtomicOrder; const AtomicRmwOp = builtin.AtomicRmwOp; +const assert = std.debug.assert; /// Many producer, many consumer, non-allocating, thread-safe. -/// Uses a spinlock to protect get() and put(). +/// Uses a mutex to protect access. pub fn Queue(comptime T: type) type { return struct { head: ?*Node, tail: ?*Node, - lock: u8, + mutex: std.Mutex, pub const Self = this; - - pub const Node = struct { - next: ?*Node, - data: T, - }; + pub const Node = std.LinkedList(T).Node; pub fn init() Self { return Self{ .head = null, .tail = null, - .lock = 0, + .mutex = std.Mutex.init(), }; } pub fn put(self: *Self, node: *Node) void { node.next = null; - while (@atomicRmw(u8, &self.lock, builtin.AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst) != 0) {} - defer assert(@atomicRmw(u8, &self.lock, builtin.AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst) == 1); + const held = self.mutex.acquire(); + defer held.release(); - const opt_tail = self.tail; + node.prev = self.tail; self.tail = node; - if (opt_tail) |tail| { - tail.next = node; + if (node.prev) |prev_tail| { + prev_tail.next = node; } else { assert(self.head == null); self.head = node; @@ -42,18 +40,27 @@ pub fn Queue(comptime T: type) type { } pub fn get(self: *Self) ?*Node { - while (@atomicRmw(u8, &self.lock, builtin.AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst) != 0) {} - defer assert(@atomicRmw(u8, &self.lock, builtin.AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst) == 1); + const held = self.mutex.acquire(); + defer held.release(); const head = self.head orelse return null; self.head = head.next; - if (head.next == null) self.tail = null; + if (head.next) |new_head| { + new_head.prev = null; + } else { + self.tail = null; + } + // This way, a get() and a remove() are thread-safe with each other. + head.prev = null; + head.next = null; return head; } pub fn unget(self: *Self, node: *Node) void { - while (@atomicRmw(u8, &self.lock, builtin.AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst) != 0) {} - defer assert(@atomicRmw(u8, &self.lock, builtin.AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst) == 1); + node.prev = null; + + const held = self.mutex.acquire(); + defer held.release(); const opt_head = self.head; self.head = node; @@ -65,13 +72,39 @@ pub fn Queue(comptime T: type) type { } } + /// Thread-safe with get() and remove(). Returns whether node was actually removed. + pub fn remove(self: *Self, node: *Node) bool { + const held = self.mutex.acquire(); + defer held.release(); + + if (node.prev == null and node.next == null and self.head != node) { + return false; + } + + if (node.prev) |prev| { + prev.next = node.next; + } else { + self.head = node.next; + } + if (node.next) |next| { + next.prev = node.prev; + } else { + self.tail = node.prev; + } + node.prev = null; + node.next = null; + return true; + } + pub fn isEmpty(self: *Self) bool { - return @atomicLoad(?*Node, &self.head, builtin.AtomicOrder.SeqCst) != null; + const held = self.mutex.acquire(); + defer held.release(); + return self.head != null; } pub fn dump(self: *Self) void { - while (@atomicRmw(u8, &self.lock, builtin.AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst) != 0) {} - defer assert(@atomicRmw(u8, &self.lock, builtin.AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst) == 1); + const held = self.mutex.acquire(); + defer held.release(); std.debug.warn("head: "); dumpRecursive(self.head, 0); @@ -93,9 +126,6 @@ pub fn Queue(comptime T: type) type { }; } -const std = @import("../index.zig"); -const assert = std.debug.assert; - const Context = struct { allocator: *std.mem.Allocator, queue: *Queue(i32), @@ -169,6 +199,7 @@ fn startPuts(ctx: *Context) u8 { std.os.time.sleep(0, 1); // let the os scheduler be our fuzz const x = @bitCast(i32, r.random.scalar(u32)); const node = ctx.allocator.create(Queue(i32).Node{ + .prev = undefined, .next = undefined, .data = x, }) catch unreachable; @@ -198,12 +229,14 @@ test "std.atomic.Queue single-threaded" { var node_0 = Queue(i32).Node{ .data = 0, .next = undefined, + .prev = undefined, }; queue.put(&node_0); var node_1 = Queue(i32).Node{ .data = 1, .next = undefined, + .prev = undefined, }; queue.put(&node_1); @@ -212,12 +245,14 @@ test "std.atomic.Queue single-threaded" { var node_2 = Queue(i32).Node{ .data = 2, .next = undefined, + .prev = undefined, }; queue.put(&node_2); var node_3 = Queue(i32).Node{ .data = 3, .next = undefined, + .prev = undefined, }; queue.put(&node_3); @@ -228,6 +263,7 @@ test "std.atomic.Queue single-threaded" { var node_4 = Queue(i32).Node{ .data = 4, .next = undefined, + .prev = undefined, }; queue.put(&node_4); -- cgit v1.2.3