aboutsummaryrefslogtreecommitdiff
path: root/std
diff options
context:
space:
mode:
authorAndrew Kelley <superjoe30@gmail.com>2018-04-29 12:29:40 -0400
committerGitHub <noreply@github.com>2018-04-29 12:29:40 -0400
commitf37e79e720215a3a41c603fe41d05d6910c79de2 (patch)
tree551877ba83cb7f5abc731efd25a21ab7fe55b170 /std
parent0bb054e5e7ccb164ea1649608d2f6e4195519cb2 (diff)
parentc76b0a845fb4176479c8bbf915e57dbdfdb7a594 (diff)
downloadzig-f37e79e720215a3a41c603fe41d05d6910c79de2.tar.gz
zig-f37e79e720215a3a41c603fe41d05d6910c79de2.zip
Merge pull request #963 from zig-lang/atomic-stack-and-queue
Atomic stack and queue
Diffstat (limited to 'std')
-rw-r--r--std/atomic/index.zig7
-rw-r--r--std/atomic/queue.zig120
-rw-r--r--std/atomic/stack.zig126
-rw-r--r--std/c/darwin.zig5
-rw-r--r--std/c/index.zig10
-rw-r--r--std/c/linux.zig5
-rw-r--r--std/heap.zig70
-rw-r--r--std/index.zig2
-rw-r--r--std/mem.zig1
-rw-r--r--std/os/darwin.zig8
-rw-r--r--std/os/index.zig223
-rw-r--r--std/os/linux/index.zig6
-rw-r--r--std/os/test.zig20
-rw-r--r--std/os/windows/index.zig6
14 files changed, 519 insertions, 90 deletions
diff --git a/std/atomic/index.zig b/std/atomic/index.zig
new file mode 100644
index 0000000000..9d556a6415
--- /dev/null
+++ b/std/atomic/index.zig
@@ -0,0 +1,7 @@
+pub const Stack = @import("stack.zig").Stack;
+pub const Queue = @import("queue.zig").Queue;
+
+test "std.atomic" {
+ _ = @import("stack.zig").Stack;
+ _ = @import("queue.zig").Queue;
+}
diff --git a/std/atomic/queue.zig b/std/atomic/queue.zig
new file mode 100644
index 0000000000..1acecbab2c
--- /dev/null
+++ b/std/atomic/queue.zig
@@ -0,0 +1,120 @@
+const builtin = @import("builtin");
+const AtomicOrder = builtin.AtomicOrder;
+const AtomicRmwOp = builtin.AtomicRmwOp;
+
+/// Many reader, many writer, non-allocating, thread-safe, lock-free
+pub fn Queue(comptime T: type) type {
+ return struct {
+ head: &Node,
+ tail: &Node,
+ root: Node,
+
+ pub const Self = this;
+
+ pub const Node = struct {
+ next: ?&Node,
+ data: T,
+ };
+
+ // TODO: well defined copy elision: https://github.com/zig-lang/zig/issues/287
+ pub fn init(self: &Self) void {
+ self.root.next = null;
+ self.head = &self.root;
+ self.tail = &self.root;
+ }
+
+ pub fn put(self: &Self, node: &Node) void {
+ node.next = null;
+
+ const tail = @atomicRmw(&Node, &self.tail, AtomicRmwOp.Xchg, node, AtomicOrder.SeqCst);
+ _ = @atomicRmw(?&Node, &tail.next, AtomicRmwOp.Xchg, node, AtomicOrder.SeqCst);
+ }
+
+ pub fn get(self: &Self) ?&Node {
+ var head = @atomicLoad(&Node, &self.head, AtomicOrder.Acquire);
+ while (true) {
+ const node = head.next ?? return null;
+ head = @cmpxchgWeak(&Node, &self.head, head, node, AtomicOrder.Release, AtomicOrder.Acquire) ?? return node;
+ }
+ }
+ };
+}
+
+const std = @import("std");
+const Context = struct {
+ allocator: &std.mem.Allocator,
+ queue: &Queue(i32),
+ put_sum: isize,
+ get_sum: isize,
+ get_count: usize,
+ puts_done: u8, // TODO make this a bool
+};
+const puts_per_thread = 10000;
+const put_thread_count = 3;
+
+test "std.atomic.queue" {
+ var direct_allocator = std.heap.DirectAllocator.init();
+ defer direct_allocator.deinit();
+
+ var plenty_of_memory = try direct_allocator.allocator.alloc(u8, 64 * 1024 * 1024);
+ defer direct_allocator.allocator.free(plenty_of_memory);
+
+ var fixed_buffer_allocator = std.heap.ThreadSafeFixedBufferAllocator.init(plenty_of_memory);
+ var a = &fixed_buffer_allocator.allocator;
+
+ var queue: Queue(i32) = undefined;
+ queue.init();
+ var context = Context {
+ .allocator = a,
+ .queue = &queue,
+ .put_sum = 0,
+ .get_sum = 0,
+ .puts_done = 0,
+ .get_count = 0,
+ };
+
+ var putters: [put_thread_count]&std.os.Thread = undefined;
+ for (putters) |*t| {
+ *t = try std.os.spawnThread(&context, startPuts);
+ }
+ var getters: [put_thread_count]&std.os.Thread = undefined;
+ for (getters) |*t| {
+ *t = try std.os.spawnThread(&context, startGets);
+ }
+
+ for (putters) |t| t.wait();
+ _ = @atomicRmw(u8, &context.puts_done, builtin.AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst);
+ for (getters) |t| t.wait();
+
+ std.debug.assert(context.put_sum == context.get_sum);
+ std.debug.assert(context.get_count == puts_per_thread * put_thread_count);
+}
+
+fn startPuts(ctx: &Context) u8 {
+ var put_count: usize = puts_per_thread;
+ var r = std.rand.DefaultPrng.init(0xdeadbeef);
+ while (put_count != 0) : (put_count -= 1) {
+ std.os.time.sleep(0, 1); // let the os scheduler be our fuzz
+ const x = @bitCast(i32, r.random.scalar(u32));
+ const node = ctx.allocator.create(Queue(i32).Node) catch unreachable;
+ node.data = x;
+ ctx.queue.put(node);
+ _ = @atomicRmw(isize, &ctx.put_sum, builtin.AtomicRmwOp.Add, x, AtomicOrder.SeqCst);
+ }
+ return 0;
+}
+
+fn startGets(ctx: &Context) u8 {
+ while (true) {
+ while (ctx.queue.get()) |node| {
+ std.os.time.sleep(0, 1); // let the os scheduler be our fuzz
+ _ = @atomicRmw(isize, &ctx.get_sum, builtin.AtomicRmwOp.Add, node.data, builtin.AtomicOrder.SeqCst);
+ _ = @atomicRmw(usize, &ctx.get_count, builtin.AtomicRmwOp.Add, 1, builtin.AtomicOrder.SeqCst);
+ }
+
+ if (@atomicLoad(u8, &ctx.puts_done, builtin.AtomicOrder.SeqCst) == 1) {
+ break;
+ }
+ }
+ return 0;
+}
diff --git a/std/atomic/stack.zig b/std/atomic/stack.zig
new file mode 100644
index 0000000000..accbcc942a
--- /dev/null
+++ b/std/atomic/stack.zig
@@ -0,0 +1,126 @@
+const builtin = @import("builtin");
+const AtomicOrder = builtin.AtomicOrder;
+
+/// Many reader, many writer, non-allocating, thread-safe, lock-free
+pub fn Stack(comptime T: type) type {
+ return struct {
+ root: ?&Node,
+
+ pub const Self = this;
+
+ pub const Node = struct {
+ next: ?&Node,
+ data: T,
+ };
+
+ pub fn init() Self {
+ return Self {
+ .root = null,
+ };
+ }
+
+ /// push operation, but only if you are the first item in the stack. if you did not succeed in
+ /// being the first item in the stack, returns the other item that was there.
+ pub fn pushFirst(self: &Self, node: &Node) ?&Node {
+ node.next = null;
+ return @cmpxchgStrong(?&Node, &self.root, null, node, AtomicOrder.SeqCst, AtomicOrder.SeqCst);
+ }
+
+ pub fn push(self: &Self, node: &Node) void {
+ var root = @atomicLoad(?&Node, &self.root, AtomicOrder.SeqCst);
+ while (true) {
+ node.next = root;
+ root = @cmpxchgWeak(?&Node, &self.root, root, node, AtomicOrder.SeqCst, AtomicOrder.SeqCst) ?? break;
+ }
+ }
+
+ pub fn pop(self: &Self) ?&Node {
+ var root = @atomicLoad(?&Node, &self.root, AtomicOrder.Acquire);
+ while (true) {
+ root = @cmpxchgWeak(?&Node, &self.root, root, (root ?? return null).next, AtomicOrder.SeqCst, AtomicOrder.SeqCst) ?? return root;
+ }
+ }
+
+ pub fn isEmpty(self: &Self) bool {
+ return @atomicLoad(?&Node, &self.root, AtomicOrder.SeqCst) == null;
+ }
+ };
+}
+
+const std = @import("std");
+const Context = struct {
+ allocator: &std.mem.Allocator,
+ stack: &Stack(i32),
+ put_sum: isize,
+ get_sum: isize,
+ get_count: usize,
+ puts_done: u8, // TODO make this a bool
+};
+const puts_per_thread = 1000;
+const put_thread_count = 3;
+
+test "std.atomic.stack" {
+ var direct_allocator = std.heap.DirectAllocator.init();
+ defer direct_allocator.deinit();
+
+ var plenty_of_memory = try direct_allocator.allocator.alloc(u8, 64 * 1024 * 1024);
+ defer direct_allocator.allocator.free(plenty_of_memory);
+
+ var fixed_buffer_allocator = std.heap.ThreadSafeFixedBufferAllocator.init(plenty_of_memory);
+ var a = &fixed_buffer_allocator.allocator;
+
+ var stack = Stack(i32).init();
+ var context = Context {
+ .allocator = a,
+ .stack = &stack,
+ .put_sum = 0,
+ .get_sum = 0,
+ .puts_done = 0,
+ .get_count = 0,
+ };
+
+ var putters: [put_thread_count]&std.os.Thread = undefined;
+ for (putters) |*t| {
+ *t = try std.os.spawnThread(&context, startPuts);
+ }
+ var getters: [put_thread_count]&std.os.Thread = undefined;
+ for (getters) |*t| {
+ *t = try std.os.spawnThread(&context, startGets);
+ }
+
+ for (putters) |t| t.wait();
+ _ = @atomicRmw(u8, &context.puts_done, builtin.AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst);
+ for (getters) |t| t.wait();
+
+ std.debug.assert(context.put_sum == context.get_sum);
+ std.debug.assert(context.get_count == puts_per_thread * put_thread_count);
+}
+
+fn startPuts(ctx: &Context) u8 {
+ var put_count: usize = puts_per_thread;
+ var r = std.rand.DefaultPrng.init(0xdeadbeef);
+ while (put_count != 0) : (put_count -= 1) {
+ std.os.time.sleep(0, 1); // let the os scheduler be our fuzz
+ const x = @bitCast(i32, r.random.scalar(u32));
+ const node = ctx.allocator.create(Stack(i32).Node) catch unreachable;
+ node.data = x;
+ ctx.stack.push(node);
+ _ = @atomicRmw(isize, &ctx.put_sum, builtin.AtomicRmwOp.Add, x, AtomicOrder.SeqCst);
+ }
+ return 0;
+}
+
+fn startGets(ctx: &Context) u8 {
+ while (true) {
+ while (ctx.stack.pop()) |node| {
+ std.os.time.sleep(0, 1); // let the os scheduler be our fuzz
+ _ = @atomicRmw(isize, &ctx.get_sum, builtin.AtomicRmwOp.Add, node.data, builtin.AtomicOrder.SeqCst);
+ _ = @atomicRmw(usize, &ctx.get_count, builtin.AtomicRmwOp.Add, 1, builtin.AtomicOrder.SeqCst);
+ }
+
+ if (@atomicLoad(u8, &ctx.puts_done, builtin.AtomicOrder.SeqCst) == 1) {
+ break;
+ }
+ }
+ return 0;
+}
diff --git a/std/c/darwin.zig b/std/c/darwin.zig
index b958055ae8..7ac57514c9 100644
--- a/std/c/darwin.zig
+++ b/std/c/darwin.zig
@@ -81,3 +81,8 @@ pub const sockaddr = extern struct {
};
pub const sa_family_t = u8;
+
+pub const pthread_attr_t = extern struct {
+ __sig: c_long,
+ __opaque: [56]u8,
+};
diff --git a/std/c/index.zig b/std/c/index.zig
index cff86f4041..34269d2aa2 100644
--- a/std/c/index.zig
+++ b/std/c/index.zig
@@ -53,3 +53,13 @@ pub extern "c" fn malloc(usize) ?&c_void;
pub extern "c" fn realloc(&c_void, usize) ?&c_void;
pub extern "c" fn free(&c_void) void;
pub extern "c" fn posix_memalign(memptr: &&c_void, alignment: usize, size: usize) c_int;
+
+pub extern "pthread" fn pthread_create(noalias newthread: &pthread_t,
+ noalias attr: ?&const pthread_attr_t, start_routine: extern fn(?&c_void) ?&c_void,
+ noalias arg: ?&c_void) c_int;
+pub extern "pthread" fn pthread_attr_init(attr: &pthread_attr_t) c_int;
+pub extern "pthread" fn pthread_attr_setstack(attr: &pthread_attr_t, stackaddr: &c_void, stacksize: usize) c_int;
+pub extern "pthread" fn pthread_attr_destroy(attr: &pthread_attr_t) c_int;
+pub extern "pthread" fn pthread_join(thread: pthread_t, arg_return: ?&?&c_void) c_int;
+
+pub const pthread_t = &@OpaqueType();
diff --git a/std/c/linux.zig b/std/c/linux.zig
index b2ac05eba5..7810fec130 100644
--- a/std/c/linux.zig
+++ b/std/c/linux.zig
@@ -3,3 +3,8 @@ pub use @import("../os/linux/errno.zig");
pub extern "c" fn getrandom(buf_ptr: &u8, buf_len: usize, flags: c_uint) c_int;
extern "c" fn __errno_location() &c_int;
pub const _errno = __errno_location;
+
+pub const pthread_attr_t = extern struct {
+ __size: [56]u8,
+ __align: c_long,
+};
diff --git a/std/heap.zig b/std/heap.zig
index b3a1e6bf27..bfdf62f658 100644
--- a/std/heap.zig
+++ b/std/heap.zig
@@ -47,13 +47,6 @@ pub const DirectAllocator = struct {
const HeapHandle = if (builtin.os == Os.windows) os.windows.HANDLE else void;
- //pub const canary_bytes = []u8 {48, 239, 128, 46, 18, 49, 147, 9, 195, 59, 203, 3, 245, 54, 9, 122};
- //pub const want_safety = switch (builtin.mode) {
- // builtin.Mode.Debug => true,
- // builtin.Mode.ReleaseSafe => true,
- // else => false,
- //};
-
pub fn init() DirectAllocator {
return DirectAllocator {
.allocator = Allocator {
@@ -98,7 +91,7 @@ pub const DirectAllocator = struct {
const unused_start = addr;
const unused_len = aligned_addr - 1 - unused_start;
- var err = p.munmap(@intToPtr(&u8, unused_start), unused_len);
+ var err = p.munmap(unused_start, unused_len);
debug.assert(p.getErrno(err) == 0);
//It is impossible that there is an unoccupied page at the top of our
@@ -139,7 +132,7 @@ pub const DirectAllocator = struct {
const rem = @rem(new_addr_end, os.page_size);
const new_addr_end_rounded = new_addr_end + if (rem == 0) 0 else (os.page_size - rem);
if (old_addr_end > new_addr_end_rounded) {
- _ = os.posix.munmap(@intToPtr(&u8, new_addr_end_rounded), old_addr_end - new_addr_end_rounded);
+ _ = os.posix.munmap(new_addr_end_rounded, old_addr_end - new_addr_end_rounded);
}
return old_mem[0..new_size];
}
@@ -177,7 +170,7 @@ pub const DirectAllocator = struct {
switch (builtin.os) {
Os.linux, Os.macosx, Os.ios => {
- _ = os.posix.munmap(bytes.ptr, bytes.len);
+ _ = os.posix.munmap(@ptrToInt(bytes.ptr), bytes.len);
},
Os.windows => {
const record_addr = @ptrToInt(bytes.ptr) + bytes.len;
@@ -298,7 +291,7 @@ pub const FixedBufferAllocator = struct {
fn alloc(allocator: &Allocator, n: usize, alignment: u29) ![]u8 {
const self = @fieldParentPtr(FixedBufferAllocator, "allocator", allocator);
- const addr = @ptrToInt(&self.buffer[self.end_index]);
+ const addr = @ptrToInt(self.buffer.ptr) + self.end_index;
const rem = @rem(addr, alignment);
const march_forward_bytes = if (rem == 0) 0 else (alignment - rem);
const adjusted_index = self.end_index + march_forward_bytes;
@@ -325,6 +318,54 @@ pub const FixedBufferAllocator = struct {
fn free(allocator: &Allocator, bytes: []u8) void { }
};
+/// lock free
+pub const ThreadSafeFixedBufferAllocator = struct {
+ allocator: Allocator,
+ end_index: usize,
+ buffer: []u8,
+
+ pub fn init(buffer: []u8) ThreadSafeFixedBufferAllocator {
+ return ThreadSafeFixedBufferAllocator {
+ .allocator = Allocator {
+ .allocFn = alloc,
+ .reallocFn = realloc,
+ .freeFn = free,
+ },
+ .buffer = buffer,
+ .end_index = 0,
+ };
+ }
+
+ fn alloc(allocator: &Allocator, n: usize, alignment: u29) ![]u8 {
+ const self = @fieldParentPtr(ThreadSafeFixedBufferAllocator, "allocator", allocator);
+ var end_index = @atomicLoad(usize, &self.end_index, builtin.AtomicOrder.SeqCst);
+ while (true) {
+ const addr = @ptrToInt(self.buffer.ptr) + end_index;
+ const rem = @rem(addr, alignment);
+ const march_forward_bytes = if (rem == 0) 0 else (alignment - rem);
+ const adjusted_index = end_index + march_forward_bytes;
+ const new_end_index = adjusted_index + n;
+ if (new_end_index > self.buffer.len) {
+ return error.OutOfMemory;
+ }
+ end_index = @cmpxchgWeak(usize, &self.end_index, end_index, new_end_index,
+ builtin.AtomicOrder.SeqCst, builtin.AtomicOrder.SeqCst) ?? return self.buffer[adjusted_index .. new_end_index];
+ }
+ }
+
+ fn realloc(allocator: &Allocator, old_mem: []u8, new_size: usize, alignment: u29) ![]u8 {
+ if (new_size <= old_mem.len) {
+ return old_mem[0..new_size];
+ } else {
+ const result = try alloc(allocator, new_size, alignment);
+ mem.copy(u8, result, old_mem);
+ return result;
+ }
+ }
+
+ fn free(allocator: &Allocator, bytes: []u8) void { }
+};
+
test "c_allocator" {
@@ -363,6 +404,13 @@ test "FixedBufferAllocator" {
try testAllocatorLargeAlignment(&fixed_buffer_allocator.allocator);
}
+test "ThreadSafeFixedBufferAllocator" {
+ var fixed_buffer_allocator = ThreadSafeFixedBufferAllocator.init(test_fixed_buffer_allocator_memory[0..]);
+
+ try testAllocator(&fixed_buffer_allocator.allocator);
+ try testAllocatorLargeAlignment(&fixed_buffer_allocator.allocator);
+}
+
fn testAllocator(allocator: &mem.Allocator) !void {
var slice = try allocator.alloc(&i32, 100);
diff --git a/std/index.zig b/std/index.zig
index 07c4360aab..d6a1e3c94d 100644
--- a/std/index.zig
+++ b/std/index.zig
@@ -8,6 +8,7 @@ pub const HashMap = @import("hash_map.zig").HashMap;
pub const LinkedList = @import("linked_list.zig").LinkedList;
pub const IntrusiveLinkedList = @import("linked_list.zig").IntrusiveLinkedList;
+pub const atomic = @import("atomic/index.zig");
pub const base64 = @import("base64.zig");
pub const build = @import("build.zig");
pub const c = @import("c/index.zig");
@@ -34,6 +35,7 @@ pub const zig = @import("zig/index.zig");
test "std" {
// run tests from these
+ _ = @import("atomic/index.zig");
_ = @import("array_list.zig");
_ = @import("buf_map.zig");
_ = @import("buf_set.zig");
diff --git a/std/mem.zig b/std/mem.zig
index cc3161cddd..0f66f549cc 100644
--- a/std/mem.zig
+++ b/std/mem.zig
@@ -32,6 +32,7 @@ pub const Allocator = struct {
freeFn: fn (self: &Allocator, old_mem: []u8) void,
fn create(self: &Allocator, comptime T: type) !&T {
+ if (@sizeOf(T) == 0) return &{};
const slice = try self.alloc(T, 1);
return &slice[0];
}
diff --git a/std/os/darwin.zig b/std/os/darwin.zig
index 44418649ab..0a62b03ab2 100644
--- a/std/os/darwin.zig
+++ b/std/os/darwin.zig
@@ -184,7 +184,7 @@ pub fn write(fd: i32, buf: &const u8, nbyte: usize) usize {
return errnoWrap(c.write(fd, @ptrCast(&const c_void, buf), nbyte));
}
-pub fn mmap(address: ?&u8, length: usize, prot: usize, flags: usize, fd: i32,
+pub fn mmap(address: ?&u8, length: usize, prot: usize, flags: u32, fd: i32,
offset: isize) usize
{
const ptr_result = c.mmap(@ptrCast(&c_void, address), length,
@@ -193,8 +193,8 @@ pub fn mmap(address: ?&u8, length: usize, prot: usize, flags: usize, fd: i32,
return errnoWrap(isize_result);
}
-pub fn munmap(address: &u8, length: usize) usize {
- return errnoWrap(c.munmap(@ptrCast(&c_void, address), length));
+pub fn munmap(address: usize, length: usize) usize {
+ return errnoWrap(c.munmap(@intToPtr(&c_void, address), length));
}
pub fn unlink(path: &const u8) usize {
@@ -341,4 +341,4 @@ pub const timeval = c.timeval;
pub const mach_timebase_info_data = c.mach_timebase_info_data;
pub const mach_absolute_time = c.mach_absolute_time;
-pub const mach_timebase_info = c.mach_timebase_info; \ No newline at end of file
+pub const mach_timebase_info = c.mach_timebase_info;
diff --git a/std/os/index.zig b/std/os/index.zig
index 0639490725..ee9ff1516c 100644
--- a/std/os/index.zig
+++ b/std/os/index.zig
@@ -2,6 +2,10 @@ const std = @import("../index.zig");
const builtin = @import("builtin");
const Os = builtin.Os;
const is_windows = builtin.os == Os.windows;
+const is_posix = switch (builtin.os) {
+ builtin.Os.linux, builtin.Os.macosx => true,
+ else => false,
+};
const os = this;
test "std.os" {
@@ -2343,24 +2347,58 @@ pub fn posixGetSockOptConnectError(sockfd: i32) PosixConnectError!void {
}
pub const Thread = struct {
- pid: i32,
- allocator: ?&mem.Allocator,
- stack: []u8,
+ data: Data,
+
+ pub const use_pthreads = is_posix and builtin.link_libc;
+ const Data = if (use_pthreads) struct {
+ handle: c.pthread_t,
+ stack_addr: usize,
+ stack_len: usize,
+ } else switch (builtin.os) {
+ builtin.Os.linux => struct {
+ pid: i32,
+ stack_addr: usize,
+ stack_len: usize,
+ },
+ builtin.Os.windows => struct {
+ handle: windows.HANDLE,
+ alloc_start: &c_void,
+ heap_handle: windows.HANDLE,
+ },
+ else => @compileError("Unsupported OS"),
+ };
pub fn wait(self: &const Thread) void {
- while (true) {
- const pid_value = @atomicLoad(i32, &self.pid, builtin.AtomicOrder.SeqCst);
- if (pid_value == 0) break;
- const rc = linux.futex_wait(@ptrToInt(&self.pid), linux.FUTEX_WAIT, pid_value, null);
- switch (linux.getErrno(rc)) {
- 0 => continue,
- posix.EINTR => continue,
- posix.EAGAIN => continue,
+ if (use_pthreads) {
+ const err = c.pthread_join(self.data.handle, null);
+ switch (err) {
+ 0 => {},
+ posix.EINVAL => unreachable,
+ posix.ESRCH => unreachable,
+ posix.EDEADLK => unreachable,
else => unreachable,
}
- }
- if (self.allocator) |a| {
- a.free(self.stack);
+ assert(posix.munmap(self.data.stack_addr, self.data.stack_len) == 0);
+ } else switch (builtin.os) {
+ builtin.Os.linux => {
+ while (true) {
+ const pid_value = @atomicLoad(i32, &self.data.pid, builtin.AtomicOrder.SeqCst);
+ if (pid_value == 0) break;
+ const rc = linux.futex_wait(@ptrToInt(&self.data.pid), linux.FUTEX_WAIT, pid_value, null);
+ switch (linux.getErrno(rc)) {
+ 0 => continue,
+ posix.EINTR => continue,
+ posix.EAGAIN => continue,
+ else => unreachable,
+ }
+ }
+ assert(posix.munmap(self.data.stack_addr, self.data.stack_len) == 0);
+ },
+ builtin.Os.windows => {
+ assert(windows.WaitForSingleObject(self.data.handle, windows.INFINITE) == windows.WAIT_OBJECT_0);
+ assert(windows.HeapFree(self.data.heap_handle, 0, self.data.alloc_start) != 0);
+ },
+ else => @compileError("Unsupported OS"),
}
}
};
@@ -2385,38 +2423,94 @@ pub const SpawnThreadError = error {
/// be copied.
SystemResources,
+ /// Not enough userland memory to spawn the thread.
+ OutOfMemory,
+
Unexpected,
};
-pub const SpawnThreadAllocatorError = SpawnThreadError || error{OutOfMemory};
-
/// caller must call wait on the returned thread
/// fn startFn(@typeOf(context)) T
/// where T is u8, noreturn, void, or !void
-pub fn spawnThreadAllocator(allocator: &mem.Allocator, context: var, comptime startFn: var) SpawnThreadAllocatorError!&Thread {
+/// caller must call wait on the returned thread
+pub fn spawnThread(context: var, comptime startFn: var) SpawnThreadError!&Thread {
// TODO compile-time call graph analysis to determine stack upper bound
// https://github.com/zig-lang/zig/issues/157
const default_stack_size = 8 * 1024 * 1024;
- const stack_bytes = try allocator.alloc(u8, default_stack_size);
- const thread = try spawnThread(stack_bytes, context, startFn);
- thread.allocator = allocator;
- return thread;
-}
-/// stack must be big enough to store one Thread and one @typeOf(context), each with default alignment, at the end
-/// fn startFn(@typeOf(context)) T
-/// where T is u8, noreturn, void, or !void
-/// caller must call wait on the returned thread
-pub fn spawnThread(stack: []u8, context: var, comptime startFn: var) SpawnThreadError!&Thread {
const Context = @typeOf(context);
comptime assert(@ArgType(@typeOf(startFn), 0) == Context);
- var stack_end: usize = @ptrToInt(stack.ptr) + stack.len;
+ if (builtin.os == builtin.Os.windows) {
+ const WinThread = struct {
+ const OuterContext = struct {
+ thread: Thread,
+ inner: Context,
+ };
+ extern fn threadMain(arg: windows.LPVOID) windows.DWORD {
+ if (@sizeOf(Context) == 0) {
+ return startFn({});
+ } else {
+ return startFn(*@ptrCast(&Context, @alignCast(@alignOf(Context), arg)));
+ }
+ }
+ };
+
+ const heap_handle = windows.GetProcessHeap() ?? return SpawnThreadError.OutOfMemory;
+ const byte_count = @alignOf(WinThread.OuterContext) + @sizeOf(WinThread.OuterContext);
+ const bytes_ptr = windows.HeapAlloc(heap_handle, 0, byte_count) ?? return SpawnThreadError.OutOfMemory;
+ errdefer assert(windows.HeapFree(heap_handle, 0, bytes_ptr) != 0);
+ const bytes = @ptrCast(&u8, bytes_ptr)[0..byte_count];
+ const outer_context = std.heap.FixedBufferAllocator.init(bytes).allocator.create(WinThread.OuterContext) catch unreachable;
+ outer_context.inner = context;
+ outer_context.thread.data.heap_handle = heap_handle;
+ outer_context.thread.data.alloc_start = bytes_ptr;
+
+ const parameter = if (@sizeOf(Context) == 0) null else @ptrCast(&c_void, &outer_context.inner);
+ outer_context.thread.data.handle = windows.CreateThread(null, default_stack_size, WinThread.threadMain,
+ parameter, 0, null) ??
+ {
+ const err = windows.GetLastError();
+ return switch (err) {
+ else => os.unexpectedErrorWindows(err),
+ };
+ };
+ return &outer_context.thread;
+ }
+
+ const MainFuncs = struct {
+ extern fn linuxThreadMain(ctx_addr: usize) u8 {
+ if (@sizeOf(Context) == 0) {
+ return startFn({});
+ } else {
+ return startFn(*@intToPtr(&const Context, ctx_addr));
+ }
+ }
+ extern fn posixThreadMain(ctx: ?&c_void) ?&c_void {
+ if (@sizeOf(Context) == 0) {
+ _ = startFn({});
+ return null;
+ } else {
+ _ = startFn(*@ptrCast(&const Context, @alignCast(@alignOf(Context), ctx)));
+ return null;
+ }
+ }
+ };
+
+ const MAP_GROWSDOWN = if (builtin.os == builtin.Os.linux) linux.MAP_GROWSDOWN else 0;
+
+ const mmap_len = default_stack_size;
+ const stack_addr = posix.mmap(null, mmap_len, posix.PROT_READ|posix.PROT_WRITE,
+ posix.MAP_PRIVATE|posix.MAP_ANONYMOUS|MAP_GROWSDOWN, -1, 0);
+ if (stack_addr == posix.MAP_FAILED) return error.OutOfMemory;
+ errdefer assert(posix.munmap(stack_addr, mmap_len) == 0);
+
+ var stack_end: usize = stack_addr + mmap_len;
var arg: usize = undefined;
if (@sizeOf(Context) != 0) {
stack_end -= @sizeOf(Context);
stack_end -= stack_end % @alignOf(Context);
- assert(stack_end >= @ptrToInt(stack.ptr));
+ assert(stack_end >= stack_addr);
const context_ptr = @alignCast(@alignOf(Context), @intToPtr(&Context, stack_end));
*context_ptr = context;
arg = stack_end;
@@ -2424,36 +2518,53 @@ pub fn spawnThread(stack: []u8, context: var, comptime startFn: var) SpawnThread
stack_end -= @sizeOf(Thread);
stack_end -= stack_end % @alignOf(Thread);
- assert(stack_end >= @ptrToInt(stack.ptr));
+ assert(stack_end >= stack_addr);
const thread_ptr = @alignCast(@alignOf(Thread), @intToPtr(&Thread, stack_end));
- thread_ptr.stack = stack;
- thread_ptr.allocator = null;
- const threadMain = struct {
- extern fn threadMain(ctx_addr: usize) u8 {
- if (@sizeOf(Context) == 0) {
- return startFn({});
- } else {
- return startFn(*@intToPtr(&const Context, ctx_addr));
- }
- }
- }.threadMain;
+ thread_ptr.data.stack_addr = stack_addr;
+ thread_ptr.data.stack_len = mmap_len;
- const flags = posix.CLONE_VM | posix.CLONE_FS | posix.CLONE_FILES | posix.CLONE_SIGHAND
- | posix.CLONE_THREAD | posix.CLONE_SYSVSEM // | posix.CLONE_SETTLS
- | posix.CLONE_PARENT_SETTID | posix.CLONE_CHILD_CLEARTID | posix.CLONE_DETACHED;
- const newtls: usize = 0;
- const rc = posix.clone(threadMain, stack_end, flags, arg, &thread_ptr.pid, newtls, &thread_ptr.pid);
- const err = posix.getErrno(rc);
- switch (err) {
- 0 => return thread_ptr,
- posix.EAGAIN => return SpawnThreadError.ThreadQuotaExceeded,
- posix.EINVAL => unreachable,
- posix.ENOMEM => return SpawnThreadError.SystemResources,
- posix.ENOSPC => unreachable,
- posix.EPERM => unreachable,
- posix.EUSERS => unreachable,
- else => return unexpectedErrorPosix(err),
+ if (builtin.os == builtin.Os.windows) {
+ // use windows API directly
+ @compileError("TODO support spawnThread for Windows");
+ } else if (Thread.use_pthreads) {
+ // use pthreads
+ var attr: c.pthread_attr_t = undefined;
+ if (c.pthread_attr_init(&attr) != 0) return SpawnThreadError.SystemResources;
+ defer assert(c.pthread_attr_destroy(&attr) == 0);
+
+ // align to page
+ stack_end -= stack_end % os.page_size;
+ assert(c.pthread_attr_setstack(&attr, @intToPtr(&c_void, stack_addr), stack_end - stack_addr) == 0);
+
+ const err = c.pthread_create(&thread_ptr.data.handle, &attr, MainFuncs.posixThreadMain, @intToPtr(&c_void, arg));
+ switch (err) {
+ 0 => return thread_ptr,
+ posix.EAGAIN => return SpawnThreadError.SystemResources,
+ posix.EPERM => unreachable,
+ posix.EINVAL => unreachable,
+ else => return unexpectedErrorPosix(usize(err)),
+ }
+ } else if (builtin.os == builtin.Os.linux) {
+ // use linux API directly
+ const flags = posix.CLONE_VM | posix.CLONE_FS | posix.CLONE_FILES | posix.CLONE_SIGHAND
+ | posix.CLONE_THREAD | posix.CLONE_SYSVSEM // | posix.CLONE_SETTLS
+ | posix.CLONE_PARENT_SETTID | posix.CLONE_CHILD_CLEARTID | posix.CLONE_DETACHED;
+ const newtls: usize = 0;
+ const rc = posix.clone(MainFuncs.linuxThreadMain, stack_end, flags, arg, &thread_ptr.data.pid, newtls, &thread_ptr.data.pid);
+ const err = posix.getErrno(rc);
+ switch (err) {
+ 0 => return thread_ptr,
+ posix.EAGAIN => return SpawnThreadError.ThreadQuotaExceeded,
+ posix.EINVAL => unreachable,
+ posix.ENOMEM => return SpawnThreadError.SystemResources,
+ posix.ENOSPC => unreachable,
+ posix.EPERM => unreachable,
+ posix.EUSERS => unreachable,
+ else => return unexpectedErrorPosix(err),
+ }
+ } else {
+ @compileError("Unsupported OS");
}
}
diff --git a/std/os/linux/index.zig b/std/os/linux/index.zig
index dcd9532d1d..368f074b9b 100644
--- a/std/os/linux/index.zig
+++ b/std/os/linux/index.zig
@@ -706,13 +706,13 @@ pub fn umount2(special: &const u8, flags: u32) usize {
return syscall2(SYS_umount2, @ptrToInt(special), flags);
}
-pub fn mmap(address: ?&u8, length: usize, prot: usize, flags: usize, fd: i32, offset: isize) usize {
+pub fn mmap(address: ?&u8, length: usize, prot: usize, flags: u32, fd: i32, offset: isize) usize {
return syscall6(SYS_mmap, @ptrToInt(address), length, prot, flags, usize(fd),
@bitCast(usize, offset));
}
-pub fn munmap(address: &u8, length: usize) usize {
- return syscall2(SYS_munmap, @ptrToInt(address), length);
+pub fn munmap(address: usize, length: usize) usize {
+ return syscall2(SYS_munmap, address, length);
}
pub fn read(fd: i32, buf: &u8, count: usize) usize {
diff --git a/std/os/test.zig b/std/os/test.zig
index 41afee004a..56d6e8b309 100644
--- a/std/os/test.zig
+++ b/std/os/test.zig
@@ -44,24 +44,12 @@ test "access file" {
}
test "spawn threads" {
- if (builtin.os != builtin.Os.linux) {
- // TODO implement threads on macos and windows
- return;
- }
-
- var direct_allocator = std.heap.DirectAllocator.init();
- defer direct_allocator.deinit();
-
var shared_ctx: i32 = 1;
- const thread1 = try std.os.spawnThreadAllocator(&direct_allocator.allocator, {}, start1);
- const thread4 = try std.os.spawnThreadAllocator(&direct_allocator.allocator, &shared_ctx, start2);
-
- var stack1: [1024]u8 = undefined;
- var stack2: [1024]u8 = undefined;
-
- const thread2 = try std.os.spawnThread(stack1[0..], &shared_ctx, start2);
- const thread3 = try std.os.spawnThread(stack2[0..], &shared_ctx, start2);
+ const thread1 = try std.os.spawnThread({}, start1);
+ const thread2 = try std.os.spawnThread(&shared_ctx, start2);
+ const thread3 = try std.os.spawnThread(&shared_ctx, start2);
+ const thread4 = try std.os.spawnThread(&shared_ctx, start2);
thread1.wait();
thread2.wait();
diff --git a/std/os/windows/index.zig b/std/os/windows/index.zig
index d6ef7205e8..e13ed0f131 100644
--- a/std/os/windows/index.zig
+++ b/std/os/windows/index.zig
@@ -28,6 +28,9 @@ pub extern "kernel32" stdcallcc fn CreateProcessA(lpApplicationName: ?LPCSTR, lp
pub extern "kernel32" stdcallcc fn CreateSymbolicLinkA(lpSymlinkFileName: LPCSTR, lpTargetFileName: LPCSTR,
dwFlags: DWORD) BOOLEAN;
+
+pub extern "kernel32" stdcallcc fn CreateThread(lpThreadAttributes: ?LPSECURITY_ATTRIBUTES, dwStackSize: SIZE_T, lpStartAddress: LPTHREAD_START_ROUTINE, lpParameter: ?LPVOID, dwCreationFlags: DWORD, lpThreadId: ?LPDWORD) ?HANDLE;
+
pub extern "kernel32" stdcallcc fn DeleteFileA(lpFileName: LPCSTR) BOOL;
pub extern "kernel32" stdcallcc fn ExitProcess(exit_code: UINT) noreturn;
@@ -318,6 +321,9 @@ pub const HEAP_CREATE_ENABLE_EXECUTE = 0x00040000;
pub const HEAP_GENERATE_EXCEPTIONS = 0x00000004;
pub const HEAP_NO_SERIALIZE = 0x00000001;
+pub const PTHREAD_START_ROUTINE = extern fn(LPVOID) DWORD;
+pub const LPTHREAD_START_ROUTINE = PTHREAD_START_ROUTINE;
+
test "import" {
_ = @import("util.zig");
}