aboutsummaryrefslogtreecommitdiff
path: root/std
diff options
context:
space:
mode:
authorAndrew Kelley <superjoe30@gmail.com>2018-08-02 17:04:17 -0400
committerAndrew Kelley <superjoe30@gmail.com>2018-08-02 17:04:17 -0400
commit821805aa92898cfdb770b87ac916e45e428621b8 (patch)
tree9fc4468728398b92c5d3c4bdf62fd57434abf9ee /std
parente3ae2cfb5243e7255bf4dbcc8a9b7e77a31e9d45 (diff)
downloadzig-821805aa92898cfdb770b87ac916e45e428621b8.tar.gz
zig-821805aa92898cfdb770b87ac916e45e428621b8.zip
WIP: Channel.getOrNull
Diffstat (limited to 'std')
-rw-r--r--std/atomic/queue.zig84
-rw-r--r--std/event.zig2
-rw-r--r--std/event/channel.zig197
-rw-r--r--std/event/fs.zig11
-rw-r--r--std/event/lock.zig18
-rw-r--r--std/event/loop.zig11
-rw-r--r--std/event/rwlock.zig2
-rw-r--r--std/index.zig1
-rw-r--r--std/linked_list.zig101
9 files changed, 269 insertions, 158 deletions
diff --git a/std/atomic/queue.zig b/std/atomic/queue.zig
index df31c88d2a..6948af43ba 100644
--- a/std/atomic/queue.zig
+++ b/std/atomic/queue.zig
@@ -1,40 +1,38 @@
+const std = @import("../index.zig");
const builtin = @import("builtin");
const AtomicOrder = builtin.AtomicOrder;
const AtomicRmwOp = builtin.AtomicRmwOp;
+const assert = std.debug.assert;
/// Many producer, many consumer, non-allocating, thread-safe.
-/// Uses a spinlock to protect get() and put().
+/// Uses a mutex to protect access.
pub fn Queue(comptime T: type) type {
return struct {
head: ?*Node,
tail: ?*Node,
- lock: u8,
+ mutex: std.Mutex,
pub const Self = this;
-
- pub const Node = struct {
- next: ?*Node,
- data: T,
- };
+ pub const Node = std.LinkedList(T).Node;
pub fn init() Self {
return Self{
.head = null,
.tail = null,
- .lock = 0,
+ .mutex = std.Mutex.init(),
};
}
pub fn put(self: *Self, node: *Node) void {
node.next = null;
- while (@atomicRmw(u8, &self.lock, builtin.AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst) != 0) {}
- defer assert(@atomicRmw(u8, &self.lock, builtin.AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst) == 1);
+ const held = self.mutex.acquire();
+ defer held.release();
- const opt_tail = self.tail;
+ node.prev = self.tail;
self.tail = node;
- if (opt_tail) |tail| {
- tail.next = node;
+ if (node.prev) |prev_tail| {
+ prev_tail.next = node;
} else {
assert(self.head == null);
self.head = node;
@@ -42,18 +40,27 @@ pub fn Queue(comptime T: type) type {
}
pub fn get(self: *Self) ?*Node {
- while (@atomicRmw(u8, &self.lock, builtin.AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst) != 0) {}
- defer assert(@atomicRmw(u8, &self.lock, builtin.AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst) == 1);
+ const held = self.mutex.acquire();
+ defer held.release();
const head = self.head orelse return null;
self.head = head.next;
- if (head.next == null) self.tail = null;
+ if (head.next) |new_head| {
+ new_head.prev = null;
+ } else {
+ self.tail = null;
+ }
+ // This way, a get() and a remove() are thread-safe with each other.
+ head.prev = null;
+ head.next = null;
return head;
}
pub fn unget(self: *Self, node: *Node) void {
- while (@atomicRmw(u8, &self.lock, builtin.AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst) != 0) {}
- defer assert(@atomicRmw(u8, &self.lock, builtin.AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst) == 1);
+ node.prev = null;
+
+ const held = self.mutex.acquire();
+ defer held.release();
const opt_head = self.head;
self.head = node;
@@ -65,13 +72,39 @@ pub fn Queue(comptime T: type) type {
}
}
+ /// Thread-safe with get() and remove(). Returns whether node was actually removed.
+ pub fn remove(self: *Self, node: *Node) bool {
+ const held = self.mutex.acquire();
+ defer held.release();
+
+ if (node.prev == null and node.next == null and self.head != node) {
+ return false;
+ }
+
+ if (node.prev) |prev| {
+ prev.next = node.next;
+ } else {
+ self.head = node.next;
+ }
+ if (node.next) |next| {
+ next.prev = node.prev;
+ } else {
+ self.tail = node.prev;
+ }
+ node.prev = null;
+ node.next = null;
+ return true;
+ }
+
pub fn isEmpty(self: *Self) bool {
- return @atomicLoad(?*Node, &self.head, builtin.AtomicOrder.SeqCst) != null;
+ const held = self.mutex.acquire();
+ defer held.release();
+ return self.head != null;
}
pub fn dump(self: *Self) void {
- while (@atomicRmw(u8, &self.lock, builtin.AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst) != 0) {}
- defer assert(@atomicRmw(u8, &self.lock, builtin.AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst) == 1);
+ const held = self.mutex.acquire();
+ defer held.release();
std.debug.warn("head: ");
dumpRecursive(self.head, 0);
@@ -93,9 +126,6 @@ pub fn Queue(comptime T: type) type {
};
}
-const std = @import("../index.zig");
-const assert = std.debug.assert;
-
const Context = struct {
allocator: *std.mem.Allocator,
queue: *Queue(i32),
@@ -169,6 +199,7 @@ fn startPuts(ctx: *Context) u8 {
std.os.time.sleep(0, 1); // let the os scheduler be our fuzz
const x = @bitCast(i32, r.random.scalar(u32));
const node = ctx.allocator.create(Queue(i32).Node{
+ .prev = undefined,
.next = undefined,
.data = x,
}) catch unreachable;
@@ -198,12 +229,14 @@ test "std.atomic.Queue single-threaded" {
var node_0 = Queue(i32).Node{
.data = 0,
.next = undefined,
+ .prev = undefined,
};
queue.put(&node_0);
var node_1 = Queue(i32).Node{
.data = 1,
.next = undefined,
+ .prev = undefined,
};
queue.put(&node_1);
@@ -212,12 +245,14 @@ test "std.atomic.Queue single-threaded" {
var node_2 = Queue(i32).Node{
.data = 2,
.next = undefined,
+ .prev = undefined,
};
queue.put(&node_2);
var node_3 = Queue(i32).Node{
.data = 3,
.next = undefined,
+ .prev = undefined,
};
queue.put(&node_3);
@@ -228,6 +263,7 @@ test "std.atomic.Queue single-threaded" {
var node_4 = Queue(i32).Node{
.data = 4,
.next = undefined,
+ .prev = undefined,
};
queue.put(&node_4);
diff --git a/std/event.zig b/std/event.zig
index 193ccbf3e6..bd3262a575 100644
--- a/std/event.zig
+++ b/std/event.zig
@@ -3,7 +3,7 @@ pub const Future = @import("event/future.zig").Future;
pub const Group = @import("event/group.zig").Group;
pub const Lock = @import("event/lock.zig").Lock;
pub const Locked = @import("event/locked.zig").Locked;
-pub const RwLock = @import("event/rwlock.zig").Lock;
+pub const RwLock = @import("event/rwlock.zig").RwLock;
pub const RwLocked = @import("event/rwlocked.zig").RwLocked;
pub const Loop = @import("event/loop.zig").Loop;
pub const fs = @import("event/fs.zig");
diff --git a/std/event/channel.zig b/std/event/channel.zig
index 61e470fa4e..aa17b0db65 100644
--- a/std/event/channel.zig
+++ b/std/event/channel.zig
@@ -5,7 +5,7 @@ const AtomicRmwOp = builtin.AtomicRmwOp;
const AtomicOrder = builtin.AtomicOrder;
const Loop = std.event.Loop;
-/// many producer, many consumer, thread-safe, lock-free, runtime configurable buffer size
+/// many producer, many consumer, thread-safe, runtime configurable buffer size
/// when buffer is empty, consumers suspend and are resumed by producers
/// when buffer is full, producers suspend and are resumed by consumers
pub fn Channel(comptime T: type) type {
@@ -13,6 +13,7 @@ pub fn Channel(comptime T: type) type {
loop: *Loop,
getters: std.atomic.Queue(GetNode),
+ or_null_queue: std.atomic.Queue(*std.atomic.Queue(GetNode).Node),
putters: std.atomic.Queue(PutNode),
get_count: usize,
put_count: usize,
@@ -26,8 +27,22 @@ pub fn Channel(comptime T: type) type {
const SelfChannel = this;
const GetNode = struct {
- ptr: *T,
tick_node: *Loop.NextTickNode,
+ data: Data,
+
+ const Data = union(enum) {
+ Normal: Normal,
+ OrNull: OrNull,
+ };
+
+ const Normal = struct {
+ ptr: *T,
+ };
+
+ const OrNull = struct {
+ ptr: *?T,
+ or_null: *std.atomic.Queue(*std.atomic.Queue(GetNode).Node).Node,
+ };
};
const PutNode = struct {
data: T,
@@ -48,6 +63,7 @@ pub fn Channel(comptime T: type) type {
.need_dispatch = 0,
.getters = std.atomic.Queue(GetNode).init(),
.putters = std.atomic.Queue(PutNode).init(),
+ .or_null_queue = std.atomic.Queue(*std.atomic.Queue(GetNode).Node).init(),
.get_count = 0,
.put_count = 0,
});
@@ -71,18 +87,31 @@ pub fn Channel(comptime T: type) type {
/// puts a data item in the channel. The promise completes when the value has been added to the
/// buffer, or in the case of a zero size buffer, when the item has been retrieved by a getter.
pub async fn put(self: *SelfChannel, data: T) void {
+ // TODO fix this workaround
+ var my_handle: promise = undefined;
+ suspend |p| {
+ my_handle = p;
+ resume p;
+ }
+
+ var my_tick_node = Loop.NextTickNode.init(my_handle);
+ var queue_node = std.atomic.Queue(PutNode).Node.init(PutNode{
+ .tick_node = &my_tick_node,
+ .data = data,
+ });
+
+ // TODO test canceling a put()
+ errdefer {
+ _ = @atomicRmw(usize, &self.put_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
+ const need_dispatch = !self.putters.remove(&queue_node);
+ self.loop.cancelOnNextTick(&my_tick_node);
+ if (need_dispatch) {
+ // oops we made the put_count incorrect for a period of time. fix by dispatching.
+ _ = @atomicRmw(usize, &self.put_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst);
+ self.dispatch();
+ }
+ }
suspend |handle| {
- var my_tick_node = Loop.NextTickNode{
- .next = undefined,
- .data = handle,
- };
- var queue_node = std.atomic.Queue(PutNode).Node{
- .data = PutNode{
- .tick_node = &my_tick_node,
- .data = data,
- },
- .next = undefined,
- };
self.putters.put(&queue_node);
_ = @atomicRmw(usize, &self.put_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst);
@@ -93,21 +122,37 @@ pub fn Channel(comptime T: type) type {
/// await this function to get an item from the channel. If the buffer is empty, the promise will
/// complete when the next item is put in the channel.
pub async fn get(self: *SelfChannel) T {
+ // TODO fix this workaround
+ var my_handle: promise = undefined;
+ suspend |p| {
+ my_handle = p;
+ resume p;
+ }
+
// TODO integrate this function with named return values
// so we can get rid of this extra result copy
var result: T = undefined;
- suspend |handle| {
- var my_tick_node = Loop.NextTickNode{
- .next = undefined,
- .data = handle,
- };
- var queue_node = std.atomic.Queue(GetNode).Node{
- .data = GetNode{
- .ptr = &result,
- .tick_node = &my_tick_node,
- },
- .next = undefined,
- };
+ var my_tick_node = Loop.NextTickNode.init(my_handle);
+ var queue_node = std.atomic.Queue(GetNode).Node.init(GetNode{
+ .tick_node = &my_tick_node,
+ .data = GetNode.Data{
+ .Normal = GetNode.Normal{ .ptr = &result },
+ },
+ });
+
+ // TODO test canceling a get()
+ errdefer {
+ _ = @atomicRmw(usize, &self.get_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
+ const need_dispatch = !self.getters.remove(&queue_node);
+ self.loop.cancelOnNextTick(&my_tick_node);
+ if (need_dispatch) {
+ // oops we made the get_count incorrect for a period of time. fix by dispatching.
+ _ = @atomicRmw(usize, &self.get_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst);
+ self.dispatch();
+ }
+ }
+
+ suspend |_| {
self.getters.put(&queue_node);
_ = @atomicRmw(usize, &self.get_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst);
@@ -116,8 +161,64 @@ pub fn Channel(comptime T: type) type {
return result;
}
- fn getOrNull(self: *SelfChannel) ?T {
- TODO();
+ //pub async fn select(comptime EnumUnion: type, channels: ...) EnumUnion {
+ // assert(@memberCount(EnumUnion) == channels.len); // enum union and channels mismatch
+ // assert(channels.len != 0); // enum unions cannot have 0 fields
+ // if (channels.len == 1) {
+ // const result = await (async channels[0].get() catch unreachable);
+ // return @unionInit(EnumUnion, @memberName(EnumUnion, 0), result);
+ // }
+ //}
+
+ /// Await this function to get an item from the channel. If the buffer is empty and there are no
+ /// puts waiting, this returns null.
+ /// Await is necessary for locking purposes. The function will be resumed after checking the channel
+ /// for data and will not wait for data to be available.
+ pub async fn getOrNull(self: *SelfChannel) ?T {
+ // TODO fix this workaround
+ var my_handle: promise = undefined;
+ suspend |p| {
+ my_handle = p;
+ resume p;
+ }
+
+ // TODO integrate this function with named return values
+ // so we can get rid of this extra result copy
+ var result: ?T = null;
+ var my_tick_node = Loop.NextTickNode.init(my_handle);
+ var or_null_node = std.atomic.Queue(*std.atomic.Queue(GetNode).Node).Node.init(undefined);
+ var queue_node = std.atomic.Queue(GetNode).Node.init(GetNode{
+ .tick_node = &my_tick_node,
+ .data = GetNode.Data{
+ .OrNull = GetNode.OrNull{
+ .ptr = &result,
+ .or_null = &or_null_node,
+ },
+ },
+ });
+ or_null_node.data = &queue_node;
+
+ // TODO test canceling getOrNull
+ errdefer {
+ _ = self.or_null_queue.remove(&or_null_node);
+ _ = @atomicRmw(usize, &self.get_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
+ const need_dispatch = !self.getters.remove(&queue_node);
+ self.loop.cancelOnNextTick(&my_tick_node);
+ if (need_dispatch) {
+ // oops we made the get_count incorrect for a period of time. fix by dispatching.
+ _ = @atomicRmw(usize, &self.get_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst);
+ self.dispatch();
+ }
+ }
+
+ suspend |_| {
+ self.getters.put(&queue_node);
+ _ = @atomicRmw(usize, &self.get_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst);
+ self.or_null_queue.put(&or_null_node);
+
+ self.dispatch();
+ }
+ return result;
}
fn dispatch(self: *SelfChannel) void {
@@ -143,7 +244,15 @@ pub fn Channel(comptime T: type) type {
if (get_count == 0) break :one_dispatch;
const get_node = &self.getters.get().?.data;
- get_node.ptr.* = self.buffer_nodes[self.buffer_index -% self.buffer_len];
+ switch (get_node.data) {
+ GetNode.Data.Normal => |info| {
+ info.ptr.* = self.buffer_nodes[self.buffer_index -% self.buffer_len];
+ },
+ GetNode.Data.OrNull => |info| {
+ _ = self.or_null_queue.remove(info.or_null);
+ info.ptr.* = self.buffer_nodes[self.buffer_index -% self.buffer_len];
+ },
+ }
self.loop.onNextTick(get_node.tick_node);
self.buffer_len -= 1;
@@ -155,7 +264,15 @@ pub fn Channel(comptime T: type) type {
const get_node = &self.getters.get().?.data;
const put_node = &self.putters.get().?.data;
- get_node.ptr.* = put_node.data;
+ switch (get_node.data) {
+ GetNode.Data.Normal => |info| {
+ info.ptr.* = put_node.data;
+ },
+ GetNode.Data.OrNull => |info| {
+ _ = self.or_null_queue.remove(info.or_null);
+ info.ptr.* = put_node.data;
+ },
+ }
self.loop.onNextTick(get_node.tick_node);
self.loop.onNextTick(put_node.tick_node);
@@ -180,6 +297,16 @@ pub fn Channel(comptime T: type) type {
_ = @atomicRmw(usize, &self.get_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst);
_ = @atomicRmw(usize, &self.put_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst);
+ // All the "get or null" functions should resume now.
+ var remove_count: usize = 0;
+ while (self.or_null_queue.get()) |or_null_node| {
+ remove_count += @boolToInt(self.getters.remove(or_null_node.data));
+ self.loop.onNextTick(or_null_node.data.data.tick_node);
+ }
+ if (remove_count != 0) {
+ _ = @atomicRmw(usize, &self.get_count, AtomicRmwOp.Sub, remove_count, AtomicOrder.SeqCst);
+ }
+
// clear need-dispatch flag
const need_dispatch = @atomicRmw(u8, &self.need_dispatch, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
if (need_dispatch != 0) continue;
@@ -230,6 +357,15 @@ async fn testChannelGetter(loop: *Loop, channel: *Channel(i32)) void {
const value2_promise = try async channel.get();
const value2 = await value2_promise;
assert(value2 == 4567);
+
+ const value3_promise = try async channel.getOrNull();
+ const value3 = await value3_promise;
+ assert(value3 == null);
+
+ const last_put = try async testPut(channel, 4444);
+ const value4 = await try async channel.getOrNull();
+ assert(value4.? == 4444);
+ await last_put;
}
async fn testChannelPutter(channel: *Channel(i32)) void {
@@ -237,3 +373,6 @@ async fn testChannelPutter(channel: *Channel(i32)) void {
await (async channel.put(4567) catch @panic("out of memory"));
}
+async fn testPut(channel: *Channel(i32), value: i32) void {
+ await (async channel.put(value) catch @panic("out of memory"));
+}
diff --git a/std/event/fs.zig b/std/event/fs.zig
index d002651ac9..517f08db48 100644
--- a/std/event/fs.zig
+++ b/std/event/fs.zig
@@ -99,6 +99,7 @@ pub async fn pwritev(loop: *event.Loop, fd: os.FileHandle, offset: usize, data:
}
var req_node = RequestNode{
+ .prev = undefined,
.next = undefined,
.data = Request{
.msg = Request.Msg{
@@ -111,6 +112,7 @@ pub async fn pwritev(loop: *event.Loop, fd: os.FileHandle, offset: usize, data:
},
.finish = Request.Finish{
.TickNode = event.Loop.NextTickNode{
+ .prev = undefined,
.next = undefined,
.data = my_handle,
},
@@ -148,6 +150,7 @@ pub async fn preadv(loop: *event.Loop, fd: os.FileHandle, offset: usize, data: [
}
var req_node = RequestNode{
+ .prev = undefined,
.next = undefined,
.data = Request{
.msg = Request.Msg{
@@ -160,6 +163,7 @@ pub async fn preadv(loop: *event.Loop, fd: os.FileHandle, offset: usize, data: [
},
.finish = Request.Finish{
.TickNode = event.Loop.NextTickNode{
+ .prev = undefined,
.next = undefined,
.data = my_handle,
},
@@ -186,6 +190,7 @@ pub async fn openRead(loop: *event.Loop, path: []const u8) os.File.OpenError!os.
defer loop.allocator.free(path_with_null);
var req_node = RequestNode{
+ .prev = undefined,
.next = undefined,
.data = Request{
.msg = Request.Msg{
@@ -196,6 +201,7 @@ pub async fn openRead(loop: *event.Loop, path: []const u8) os.File.OpenError!os.
},
.finish = Request.Finish{
.TickNode = event.Loop.NextTickNode{
+ .prev = undefined,
.next = undefined,
.data = my_handle,
},
@@ -227,6 +233,7 @@ pub async fn openReadWrite(
defer loop.allocator.free(path_with_null);
var req_node = RequestNode{
+ .prev = undefined,
.next = undefined,
.data = Request{
.msg = Request.Msg{
@@ -238,6 +245,7 @@ pub async fn openReadWrite(
},
.finish = Request.Finish{
.TickNode = event.Loop.NextTickNode{
+ .prev = undefined,
.next = undefined,
.data = my_handle,
},
@@ -267,6 +275,7 @@ pub const CloseOperation = struct {
.loop = loop,
.have_fd = false,
.close_req_node = RequestNode{
+ .prev = undefined,
.next = undefined,
.data = Request{
.msg = Request.Msg{
@@ -312,6 +321,7 @@ pub async fn writeFileMode(loop: *event.Loop, path: []const u8, contents: []cons
defer loop.allocator.free(path_with_null);
var req_node = RequestNode{
+ .prev = undefined,
.next = undefined,
.data = Request{
.msg = Request.Msg{
@@ -324,6 +334,7 @@ pub async fn writeFileMode(loop: *event.Loop, path: []const u8, contents: []cons
},
.finish = Request.Finish{
.TickNode = event.Loop.NextTickNode{
+ .prev = undefined,
.next = undefined,
.data = my_handle,
},
diff --git a/std/event/lock.zig b/std/event/lock.zig
index 0632960f80..84caeae4f9 100644
--- a/std/event/lock.zig
+++ b/std/event/lock.zig
@@ -91,13 +91,16 @@ pub const Lock = struct {
}
pub async fn acquire(self: *Lock) Held {
- suspend |handle| {
- // TODO explicitly put this memory in the coroutine frame #1194
- var my_tick_node = Loop.NextTickNode{
- .data = handle,
- .next = undefined,
- };
+ // TODO explicitly put this memory in the coroutine frame #1194
+ var my_handle: promise = undefined;
+ suspend |p| {
+ my_handle = p;
+ resume p;
+ }
+ var my_tick_node = Loop.NextTickNode.init(my_handle);
+ errdefer _ = self.queue.remove(&my_tick_node); // TODO test canceling an acquire
+ suspend |_| {
self.queue.put(&my_tick_node);
// At this point, we are in the queue, so we might have already been resumed and this coroutine
@@ -170,6 +173,7 @@ async fn testLock(loop: *Loop, lock: *Lock) void {
}
const handle1 = async lockRunner(lock) catch @panic("out of memory");
var tick_node1 = Loop.NextTickNode{
+ .prev = undefined,
.next = undefined,
.data = handle1,
};
@@ -177,6 +181,7 @@ async fn testLock(loop: *Loop, lock: *Lock) void {
const handle2 = async lockRunner(lock) catch @panic("out of memory");
var tick_node2 = Loop.NextTickNode{
+ .prev = undefined,
.next = undefined,
.data = handle2,
};
@@ -184,6 +189,7 @@ async fn testLock(loop: *Loop, lock: *Lock) void {
const handle3 = async lockRunner(lock) catch @panic("out of memory");
var tick_node3 = Loop.NextTickNode{
+ .prev = undefined,
.next = undefined,
.data = handle3,
};
diff --git a/std/event/loop.zig b/std/event/loop.zig
index d0794d6975..5a87b7a598 100644
--- a/std/event/loop.zig
+++ b/std/event/loop.zig
@@ -120,6 +120,7 @@ pub const Loop = struct {
// we need another thread for the file system because Linux does not have an async
// file system I/O API.
self.os_data.fs_end_request = fs.RequestNode{
+ .prev = undefined,
.next = undefined,
.data = fs.Request{
.msg = fs.Request.Msg.End,
@@ -206,6 +207,7 @@ pub const Loop = struct {
.udata = @ptrToInt(&eventfd_node.data.base),
},
},
+ .prev = undefined,
.next = undefined,
};
self.available_eventfd_resume_nodes.push(eventfd_node);
@@ -270,6 +272,7 @@ pub const Loop = struct {
// this one is for sending events
.completion_key = @ptrToInt(&eventfd_node.data.base),
},
+ .prev = undefined,
.next = undefined,
};
self.available_eventfd_resume_nodes.push(eventfd_node);
@@ -422,6 +425,12 @@ pub const Loop = struct {
self.dispatch();
}
+ pub fn cancelOnNextTick(self: *Loop, node: *NextTickNode) void {
+ if (self.next_tick_queue.remove(node)) {
+ self.finishOneEvent();
+ }
+ }
+
pub fn run(self: *Loop) void {
self.finishOneEvent(); // the reference we start with
@@ -443,6 +452,7 @@ pub const Loop = struct {
suspend |p| {
handle.* = p;
var my_tick_node = Loop.NextTickNode{
+ .prev = undefined,
.next = undefined,
.data = p,
};
@@ -464,6 +474,7 @@ pub const Loop = struct {
pub async fn yield(self: *Loop) void {
suspend |p| {
var my_tick_node = Loop.NextTickNode{
+ .prev = undefined,
.next = undefined,
.data = p,
};
diff --git a/std/event/rwlock.zig b/std/event/rwlock.zig
index 07b7340fca..cbcdff06af 100644
--- a/std/event/rwlock.zig
+++ b/std/event/rwlock.zig
@@ -101,6 +101,7 @@ pub const RwLock = struct {
// TODO explicitly put this memory in the coroutine frame #1194
var my_tick_node = Loop.NextTickNode{
.data = handle,
+ .prev = undefined,
.next = undefined,
};
@@ -133,6 +134,7 @@ pub const RwLock = struct {
// TODO explicitly put this memory in the coroutine frame #1194
var my_tick_node = Loop.NextTickNode{
.data = handle,
+ .prev = undefined,
.next = undefined,
};
diff --git a/std/index.zig b/std/index.zig
index 23599c8c96..5f24d66efc 100644
--- a/std/index.zig
+++ b/std/index.zig
@@ -6,7 +6,6 @@ pub const Buffer = @import("buffer.zig").Buffer;
pub const BufferOutStream = @import("buffer.zig").BufferOutStream;
pub const HashMap = @import("hash_map.zig").HashMap;
pub const LinkedList = @import("linked_list.zig").LinkedList;
-pub const IntrusiveLinkedList = @import("linked_list.zig").IntrusiveLinkedList;
pub const SegmentedList = @import("segmented_list.zig").SegmentedList;
pub const DynLib = @import("dynamic_library.zig").DynLib;
pub const Mutex = @import("mutex.zig").Mutex;
diff --git a/std/linked_list.zig b/std/linked_list.zig
index 62cd5ca2bb..130ddbce5d 100644
--- a/std/linked_list.zig
+++ b/std/linked_list.zig
@@ -4,18 +4,8 @@ const assert = debug.assert;
const mem = std.mem;
const Allocator = mem.Allocator;
-/// Generic non-intrusive doubly linked list.
-pub fn LinkedList(comptime T: type) type {
- return BaseLinkedList(T, void, "");
-}
-
-/// Generic intrusive doubly linked list.
-pub fn IntrusiveLinkedList(comptime ParentType: type, comptime field_name: []const u8) type {
- return BaseLinkedList(void, ParentType, field_name);
-}
-
/// Generic doubly linked list.
-fn BaseLinkedList(comptime T: type, comptime ParentType: type, comptime field_name: []const u8) type {
+pub fn LinkedList(comptime T: type) type {
return struct {
const Self = this;
@@ -25,23 +15,13 @@ fn BaseLinkedList(comptime T: type, comptime ParentType: type, comptime field_na
next: ?*Node,
data: T,
- pub fn init(value: *const T) Node {
+ pub fn init(data: T) Node {
return Node{
.prev = null,
.next = null,
- .data = value.*,
+ .data = data,
};
}
-
- pub fn initIntrusive() Node {
- // TODO: when #678 is solved this can become `init`.
- return Node.init({});
- }
-
- pub fn toData(node: *Node) *ParentType {
- comptime assert(isIntrusive());
- return @fieldParentPtr(ParentType, field_name, node);
- }
};
first: ?*Node,
@@ -60,10 +40,6 @@ fn BaseLinkedList(comptime T: type, comptime ParentType: type, comptime field_na
};
}
- fn isIntrusive() bool {
- return ParentType != void or field_name.len != 0;
- }
-
/// Insert a new node after an existing one.
///
/// Arguments:
@@ -192,7 +168,6 @@ fn BaseLinkedList(comptime T: type, comptime ParentType: type, comptime field_na
/// Returns:
/// A pointer to the new node.
pub fn allocateNode(list: *Self, allocator: *Allocator) !*Node {
- comptime assert(!isIntrusive());
return allocator.create(Node(undefined));
}
@@ -202,7 +177,6 @@ fn BaseLinkedList(comptime T: type, comptime ParentType: type, comptime field_na
/// node: Pointer to the node to deallocate.
/// allocator: Dynamic memory allocator.
pub fn destroyNode(list: *Self, node: *Node, allocator: *Allocator) void {
- comptime assert(!isIntrusive());
allocator.destroy(node);
}
@@ -214,8 +188,7 @@ fn BaseLinkedList(comptime T: type, comptime ParentType: type, comptime field_na
///
/// Returns:
/// A pointer to the new node.
- pub fn createNode(list: *Self, data: *const T, allocator: *Allocator) !*Node {
- comptime assert(!isIntrusive());
+ pub fn createNode(list: *Self, data: T, allocator: *Allocator) !*Node {
var node = try list.allocateNode(allocator);
node.* = Node.init(data);
return node;
@@ -274,69 +247,3 @@ test "basic linked list test" {
assert(list.last.?.data == 4);
assert(list.len == 2);
}
-
-const ElementList = IntrusiveLinkedList(Element, "link");
-const Element = struct {
- value: u32,
- link: IntrusiveLinkedList(Element, "link").Node,
-};
-
-test "basic intrusive linked list test" {
- const allocator = debug.global_allocator;
- var list = ElementList.init();
-
- var one = Element{
- .value = 1,
- .link = ElementList.Node.initIntrusive(),
- };
- var two = Element{
- .value = 2,
- .link = ElementList.Node.initIntrusive(),
- };
- var three = Element{
- .value = 3,
- .link = ElementList.Node.initIntrusive(),
- };
- var four = Element{
- .value = 4,
- .link = ElementList.Node.initIntrusive(),
- };
- var five = Element{
- .value = 5,
- .link = ElementList.Node.initIntrusive(),
- };
-
- list.append(&two.link); // {2}
- list.append(&five.link); // {2, 5}
- list.prepend(&one.link); // {1, 2, 5}
- list.insertBefore(&five.link, &four.link); // {1, 2, 4, 5}
- list.insertAfter(&two.link, &three.link); // {1, 2, 3, 4, 5}
-
- // Traverse forwards.
- {
- var it = list.first;
- var index: u32 = 1;
- while (it) |node| : (it = node.next) {
- assert(node.toData().value == index);
- index += 1;
- }
- }
-
- // Traverse backwards.
- {
- var it = list.last;
- var index: u32 = 1;
- while (it) |node| : (it = node.prev) {
- assert(node.toData().value == (6 - index));
- index += 1;
- }
- }
-
- var first = list.popFirst(); // {2, 3, 4, 5}
- var last = list.pop(); // {2, 3, 4}
- list.remove(&three.link); // {2, 4}
-
- assert(list.first.?.toData().value == 2);
- assert(list.last.?.toData().value == 4);
- assert(list.len == 2);
-}