From 0a1def7833882249563358f262e2210beb77492a Mon Sep 17 00:00:00 2001 From: kprotty Date: Sat, 19 Jun 2021 21:31:43 -0500 Subject: changes to accomodate std.Thread update --- lib/std/Thread/AutoResetEvent.zig | 8 ++--- lib/std/Thread/Futex.zig | 60 +++++++++++++++---------------------- lib/std/Thread/Mutex.zig | 6 ++-- lib/std/Thread/ResetEvent.zig | 8 ++--- lib/std/Thread/StaticResetEvent.zig | 8 ++--- 5 files changed, 39 insertions(+), 51 deletions(-) (limited to 'lib/std/Thread') diff --git a/lib/std/Thread/AutoResetEvent.zig b/lib/std/Thread/AutoResetEvent.zig index 7372a8d8d9..13e404d602 100644 --- a/lib/std/Thread/AutoResetEvent.zig +++ b/lib/std/Thread/AutoResetEvent.zig @@ -220,9 +220,9 @@ test "basic usage" { }; var context = Context{}; - const send_thread = try std.Thread.spawn(Context.sender, &context); - const recv_thread = try std.Thread.spawn(Context.receiver, &context); + const send_thread = try std.Thread.spawn(.{}, Context.sender, .{&context}); + const recv_thread = try std.Thread.spawn(.{}, Context.receiver, .{&context}); - send_thread.wait(); - recv_thread.wait(); + send_thread.join(); + recv_thread.join(); } diff --git a/lib/std/Thread/Futex.zig b/lib/std/Thread/Futex.zig index 356a4e2046..4153f7b7c0 100644 --- a/lib/std/Thread/Futex.zig +++ b/lib/std/Thread/Futex.zig @@ -413,32 +413,27 @@ test "Futex - Signal" { } } - const Thread = struct { - tx: *Self, - rx: *Self, + const start_value = 1; - const start_value = 1; - - fn run(self: Thread) void { - var iterations: u32 = start_value; - while (iterations < 10) : (iterations += 1) { - self.rx.recv(iterations); - self.tx.send(iterations); - } + fn runThread(rx: *Self, tx: *Self) void { + var iterations: u32 = start_value; + while (iterations < 10) : (iterations += 1) { + self.rx.recv(iterations); + self.tx.send(iterations); } - }; + } fn run() !void { var ping = Self{}; var pong = Self{}; - const t1 = try std.Thread.spawn(Thread.run, .{ .rx = &ping, .tx = &pong }); - defer t1.wait(); + const t1 = try std.Thread.spawn(.{}, runThread, .{ &ping, &pong }); + defer t1.join(); - const t2 = try std.Thread.spawn(Thread.run, .{ .rx = &pong, .tx = &ping }); - defer t2.wait(); + const t2 = try std.Thread.spawn(.{}, runThread, .{ &pong, &ping }); + defer t2.join(); - ping.send(Thread.start_value); + ping.send(start_value); } }).run(); } @@ -507,7 +502,7 @@ test "Futex - Chain" { try (struct { completed: Signal = .{}, threads: [10]struct { - thread: *std.Thread, + thread: std.Thread, signal: Signal, } = undefined, @@ -531,39 +526,32 @@ test "Futex - Chain" { }; const Self = @This(); - const Chain = struct { - self: *Self, - index: usize, - fn run(chain: Chain) void { - const this_signal = &chain.self.threads[chain.index].signal; + fn runThread(self: *Self, index: usize) void { + const this_signal = &chain.self.threads[chain.index].signal; - var next_signal = &chain.self.completed; - if (chain.index + 1 < chain.self.threads.len) { - next_signal = &chain.self.threads[chain.index + 1].signal; - } - - this_signal.wait(); - next_signal.notify(); + var next_signal = &chain.self.completed; + if (chain.index + 1 < chain.self.threads.len) { + next_signal = &chain.self.threads[chain.index + 1].signal; } - }; + + this_signal.wait(); + next_signal.notify(); + } fn run() !void { var self = Self{}; for (self.threads) |*entry, index| { entry.signal = .{}; - entry.thread = try std.Thread.spawn(Chain.run, .{ - .self = &self, - .index = index, - }); + entry.thread = try std.Thread.spawn(.{}, runThread .{&self, index}); } self.threads[0].signal.notify(); self.completed.wait(); for (self.threads) |entry| { - entry.thread.wait(); + entry.thread.join(); } } }).run(); diff --git a/lib/std/Thread/Mutex.zig b/lib/std/Thread/Mutex.zig index 49f138079d..35095b2a3c 100644 --- a/lib/std/Thread/Mutex.zig +++ b/lib/std/Thread/Mutex.zig @@ -297,12 +297,12 @@ test "basic usage" { try testing.expect(context.data == TestContext.incr_count); } else { const thread_count = 10; - var threads: [thread_count]*std.Thread = undefined; + var threads: [thread_count]std.Thread = undefined; for (threads) |*t| { - t.* = try std.Thread.spawn(worker, &context); + t.* = try std.Thread.spawn(.{}, worker, .{&context}); } for (threads) |t| - t.wait(); + t.join(); try testing.expect(context.data == thread_count * TestContext.incr_count); } diff --git a/lib/std/Thread/ResetEvent.zig b/lib/std/Thread/ResetEvent.zig index f161b46aa0..356b8eb78d 100644 --- a/lib/std/Thread/ResetEvent.zig +++ b/lib/std/Thread/ResetEvent.zig @@ -281,8 +281,8 @@ test "basic usage" { var context: Context = undefined; try context.init(); defer context.deinit(); - const receiver = try std.Thread.spawn(Context.receiver, &context); - defer receiver.wait(); + const receiver = try std.Thread.spawn(.{}, Context.receiver, .{&context}); + defer receiver.join(); try context.sender(); if (false) { @@ -290,8 +290,8 @@ test "basic usage" { // https://github.com/ziglang/zig/issues/7009 var timed = Context.init(); defer timed.deinit(); - const sleeper = try std.Thread.spawn(Context.sleeper, &timed); - defer sleeper.wait(); + const sleeper = try std.Thread.spawn(.{}, Context.sleeper, .{&timed}); + defer sleeper.join(); try timed.timedWaiter(); } } diff --git a/lib/std/Thread/StaticResetEvent.zig b/lib/std/Thread/StaticResetEvent.zig index 6f869e0d89..40974938d0 100644 --- a/lib/std/Thread/StaticResetEvent.zig +++ b/lib/std/Thread/StaticResetEvent.zig @@ -384,8 +384,8 @@ test "basic usage" { }; var context = Context{}; - const receiver = try std.Thread.spawn(Context.receiver, &context); - defer receiver.wait(); + const receiver = try std.Thread.spawn(.{}, Context.receiver, .{&context}); + defer receiver.join(); try context.sender(); if (false) { @@ -393,8 +393,8 @@ test "basic usage" { // https://github.com/ziglang/zig/issues/7009 var timed = Context.init(); defer timed.deinit(); - const sleeper = try std.Thread.spawn(Context.sleeper, &timed); - defer sleeper.wait(); + const sleeper = try std.Thread.spawn(.{}, Context.sleeper, .{&timed}); + defer sleeper.join(); try timed.timedWaiter(); } } -- cgit v1.2.3 From ca1e61b851351ec66ee3f1937586a2f9d02bbafc Mon Sep 17 00:00:00 2001 From: kprotty Date: Sun, 20 Jun 2021 09:56:30 -0500 Subject: std.Thread: fix some typos --- lib/std/Thread.zig | 34 ++++++++++++++-------------------- lib/std/Thread/Futex.zig | 2 +- 2 files changed, 15 insertions(+), 21 deletions(-) (limited to 'lib/std/Thread') diff --git a/lib/std/Thread.zig b/lib/std/Thread.zig index 552961988b..22f12c3c95 100644 --- a/lib/std/Thread.zig +++ b/lib/std/Thread.zig @@ -24,16 +24,14 @@ pub const Condition = @import("Thread/Condition.zig"); pub const spinLoopHint = @compileError("deprecated: use std.atomic.spinLoopHint"); test "std.Thread" { - if (!builtin.single_threaded) { - // Doesn't use testing.refAllDecls() since that would pull in the compileError spinLoopHint. - _ = AutoResetEvent; - _ = Futex; - _ = ResetEvent; - _ = StaticResetEvent; - _ = Mutex; - _ = Semaphore; - _ = Condition; - } + // Doesn't use testing.refAllDecls() since that would pull in the compileError spinLoopHint. + _ = AutoResetEvent; + _ = Futex; + _ = ResetEvent; + _ = StaticResetEvent; + _ = Mutex; + _ = Semaphore; + _ = Condition; } pub const use_pthreads = target.os.tag != .windows and std.builtin.link_libc; @@ -114,17 +112,13 @@ pub const SpawnError = error { /// `config` can be used as hints to the platform for now to spawn and execute the `function`. /// The caller must eventually either call `join()` to wait for the thread to finish and free its resources /// or call `detach()` to excuse the caller from calling `join()` and have the thread clean up its resources on completion`. -pub fn spawn( - config: SpawnConfig, - comptime function: anytype, - args: std.meta.ArgsTuple(function), -) SpawnError!Thread { +pub fn spawn(config: SpawnConfig, comptime function: anytype, args: anytype) SpawnError!Thread { if (std.builtin.single_threaded) { @compileError("cannot spawn thread when building in single-threaded mode"); } - const impl = try Thread.spawn(config, function, args); - return .{ .impl = impl }; + const impl = try Impl.spawn(config, function, args); + return Thread{ .impl = impl }; } /// Represents a kernel thread handle. @@ -438,7 +432,7 @@ const LinuxThreadImpl = struct { fn getCurrentId() Id { return tls_thread_id orelse { - const tid = linux.gettid(); + const tid = @bitCast(u32, linux.gettid()); tls_thread_id = tid; return tid; }; @@ -550,7 +544,7 @@ const LinuxThreadImpl = struct { const instance = @ptrCast(*Instance, @alignCast(@alignOf(Instance), &mapped[instance_offset])); instance.* = .{ .fn_args = args, - .thread = .{ .mapped = .mapped }, + .thread = .{ .mapped = mapped }, }; const flags: u32 = os.CLONE_VM | os.CLONE_FS | os.CLONE_FILES | @@ -591,7 +585,7 @@ const LinuxThreadImpl = struct { } fn join(self: Impl) void { - defer self.thread.free(); + defer os.munmap(self.thread.mapped); var spin: u8 = 10; while (true) { diff --git a/lib/std/Thread/Futex.zig b/lib/std/Thread/Futex.zig index 4153f7b7c0..971f1bd095 100644 --- a/lib/std/Thread/Futex.zig +++ b/lib/std/Thread/Futex.zig @@ -544,7 +544,7 @@ test "Futex - Chain" { for (self.threads) |*entry, index| { entry.signal = .{}; - entry.thread = try std.Thread.spawn(.{}, runThread .{&self, index}); + entry.thread = try std.Thread.spawn(.{}, runThread, .{&self, index}); } self.threads[0].signal.notify(); -- cgit v1.2.3 From 1ae969e5299ea71231e7045110332b23989e653c Mon Sep 17 00:00:00 2001 From: kprotty Date: Sun, 20 Jun 2021 10:37:40 -0500 Subject: std.Thread: even more typo fixes --- lib/std/Thread/Futex.zig | 4 ++-- lib/std/event/loop.zig | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) (limited to 'lib/std/Thread') diff --git a/lib/std/Thread/Futex.zig b/lib/std/Thread/Futex.zig index 971f1bd095..033ecf688d 100644 --- a/lib/std/Thread/Futex.zig +++ b/lib/std/Thread/Futex.zig @@ -444,7 +444,7 @@ test "Futex - Broadcast" { } try (struct { - threads: [10]*std.Thread = undefined, + threads: [10]std.Thread = undefined, broadcast: Atomic(u32) = Atomic(u32).init(0), notified: Atomic(usize) = Atomic(usize).init(0), @@ -475,7 +475,7 @@ test "Futex - Broadcast" { for (self.threads) |*thread| thread.* = try std.Thread.spawn(runReceiver, &self); defer for (self.threads) |thread| - thread.wait(); + thread.join(); std.time.sleep(16 * std.time.ns_per_ms); self.broadcast.store(BROADCAST_SENT, .Monotonic); diff --git a/lib/std/event/loop.zig b/lib/std/event/loop.zig index 26cbf3b988..9c8550d459 100644 --- a/lib/std/event/loop.zig +++ b/lib/std/event/loop.zig @@ -183,7 +183,7 @@ pub const Loop = struct { resume_node_count, ); - self.extra_threads = try self.arena.allocator.alloc(*Thread, extra_thread_count); + self.extra_threads = try self.arena.allocator.alloc(Thread, extra_thread_count); try self.initOsData(extra_thread_count); errdefer self.deinitOsData(); -- cgit v1.2.3 From 18bcb2e990853f3e32cb0d72beb962390c9e8714 Mon Sep 17 00:00:00 2001 From: kprotty Date: Fri, 25 Jun 2021 14:04:11 -0500 Subject: std.Thread: fix futex thread spawning --- lib/std/Thread/Futex.zig | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) (limited to 'lib/std/Thread') diff --git a/lib/std/Thread/Futex.zig b/lib/std/Thread/Futex.zig index 033ecf688d..26ebe23364 100644 --- a/lib/std/Thread/Futex.zig +++ b/lib/std/Thread/Futex.zig @@ -418,8 +418,8 @@ test "Futex - Signal" { fn runThread(rx: *Self, tx: *Self) void { var iterations: u32 = start_value; while (iterations < 10) : (iterations += 1) { - self.rx.recv(iterations); - self.tx.send(iterations); + rx.recv(iterations); + tx.send(iterations); } } @@ -528,11 +528,11 @@ test "Futex - Chain" { const Self = @This(); fn runThread(self: *Self, index: usize) void { - const this_signal = &chain.self.threads[chain.index].signal; + const this_signal = &self.threads[index].signal; - var next_signal = &chain.self.completed; - if (chain.index + 1 < chain.self.threads.len) { - next_signal = &chain.self.threads[chain.index + 1].signal; + var next_signal = &self.completed; + if (index + 1 < self.threads.len) { + next_signal = &self.threads[index + 1].signal; } this_signal.wait(); -- cgit v1.2.3 From fd4a607bb2f1a1cbf8b8c1fd5d35f5f775e79114 Mon Sep 17 00:00:00 2001 From: kprotty Date: Sat, 26 Jun 2021 09:03:53 -0500 Subject: std.Thread: fix futex test + thread errors --- lib/std/Thread.zig | 2 + lib/std/Thread/Futex.zig | 186 ++++++++++++++++++++++------------------------- 2 files changed, 90 insertions(+), 98 deletions(-) (limited to 'lib/std/Thread') diff --git a/lib/std/Thread.zig b/lib/std/Thread.zig index 04bd94729d..19987d47a1 100644 --- a/lib/std/Thread.zig +++ b/lib/std/Thread.zig @@ -385,6 +385,7 @@ const PosixThreadImpl = struct { }; const args_ptr = try allocator.create(Args); + args_ptr.* = args; errdefer allocator.destroy(args_ptr); var attr: c.pthread_attr_t = undefined; @@ -523,6 +524,7 @@ const LinuxThreadImpl = struct { error.PermissionDenied => unreachable, else => |e| return e, }; + assert(mapped.len >= map_bytes); errdefer os.munmap(mapped); // map everything but the guard page as read/write diff --git a/lib/std/Thread/Futex.zig b/lib/std/Thread/Futex.zig index 26ebe23364..b1b2128caa 100644 --- a/lib/std/Thread/Futex.zig +++ b/lib/std/Thread/Futex.zig @@ -391,70 +391,74 @@ test "Futex - wait/wake" { } test "Futex - Signal" { - if (!single_threaded) { + if (single_threaded) { return; } - try (struct { + const Paddle = struct { value: Atomic(u32) = Atomic(u32).init(0), + current: u32 = 0, - const Self = @This(); - - fn send(self: *Self, value: u32) void { - self.value.store(value, .Release); - Futex.wake(&self.value, 1); - } - - fn recv(self: *Self, expected: u32) void { - while (true) { - const value = self.value.load(.Acquire); - if (value == expected) break; - Futex.wait(&self.value, value, null) catch unreachable; - } - } + fn run(self: *@This(), hit_to: *@This()) !void { + var iterations: usize = 4; + while (iterations > 0) : (iterations -= 1) { - const start_value = 1; + var value: u32 = undefined; + while (true) { + value = self.value.load(.Acquire); + if (value != self.current) break; + Futex.wait(&self.value, self.current, null) catch unreachable; + } - fn runThread(rx: *Self, tx: *Self) void { - var iterations: u32 = start_value; - while (iterations < 10) : (iterations += 1) { - rx.recv(iterations); - tx.send(iterations); + try testing.expectEqual(value, self.current + 1); + self.current = value; + + _ = hit_to.value.fetchAdd(1, .Release); + Futex.wake(&hit_to.value, 1); } } + }; - fn run() !void { - var ping = Self{}; - var pong = Self{}; + var ping = Paddle{}; + var pong = Paddle{}; - const t1 = try std.Thread.spawn(.{}, runThread, .{ &ping, &pong }); - defer t1.join(); + const t1 = try std.Thread.spawn(.{}, Paddle.run, .{&ping, &pong}); + defer t1.join(); - const t2 = try std.Thread.spawn(.{}, runThread, .{ &pong, &ping }); - defer t2.join(); + const t2 = try std.Thread.spawn(.{}, Paddle.run, .{&pong, &ping}); + defer t2.join(); - ping.send(start_value); - } - }).run(); + _ = ping.value.fetchAdd(1, .Release); + Futex.wake(&ping.value, 1); } test "Futex - Broadcast" { - if (!single_threaded) { + if (single_threaded) { return; } - try (struct { - threads: [10]std.Thread = undefined, + const Context = struct { + threads: [4]std.Thread = undefined, broadcast: Atomic(u32) = Atomic(u32).init(0), notified: Atomic(usize) = Atomic(usize).init(0), - const Self = @This(); - const BROADCAST_EMPTY = 0; const BROADCAST_SENT = 1; const BROADCAST_RECEIVED = 2; - fn runReceiver(self: *Self) void { + fn runSender(self: *@This()) !void { + self.broadcast.store(BROADCAST_SENT, .Monotonic); + Futex.wake(&self.broadcast, @intCast(u32, self.threads.len)); + + while (true) { + const broadcast = self.broadcast.load(.Acquire); + if (broadcast == BROADCAST_RECEIVED) break; + try testing.expectEqual(broadcast, BROADCAST_SENT); + Futex.wait(&self.broadcast, broadcast, null) catch unreachable; + } + } + + fn runReceiver(self: *@This()) void { while (true) { const broadcast = self.broadcast.load(.Acquire); if (broadcast == BROADCAST_SENT) break; @@ -468,66 +472,55 @@ test "Futex - Broadcast" { Futex.wake(&self.broadcast, 1); } } + }; - fn run() !void { - var self = Self{}; - - for (self.threads) |*thread| - thread.* = try std.Thread.spawn(runReceiver, &self); - defer for (self.threads) |thread| - thread.join(); + var ctx = Context{}; + for (ctx.threads) |*thread| + thread.* = try std.Thread.spawn(.{}, Context.runReceiver, .{&ctx}); + defer for (ctx.threads) |thread| + thread.join(); - std.time.sleep(16 * std.time.ns_per_ms); - self.broadcast.store(BROADCAST_SENT, .Monotonic); - Futex.wake(&self.broadcast, @intCast(u32, self.threads.len)); + // Try to wait for the threads to start before running runSender(). + // NOTE: not actually needed for correctness. + std.time.sleep(16 * std.time.ns_per_ms); + try ctx.runSender(); - while (true) { - const broadcast = self.broadcast.load(.Acquire); - if (broadcast == BROADCAST_RECEIVED) break; - try testing.expectEqual(broadcast, BROADCAST_SENT); - Futex.wait(&self.broadcast, broadcast, null) catch unreachable; - } - - const notified = self.notified.load(.Monotonic); - try testing.expectEqual(notified, self.threads.len); - } - }).run(); + const notified = ctx.notified.load(.Monotonic); + try testing.expectEqual(notified, ctx.threads.len); } test "Futex - Chain" { - if (!single_threaded) { + if (single_threaded) { return; } - try (struct { - completed: Signal = .{}, - threads: [10]struct { - thread: std.Thread, - signal: Signal, - } = undefined, - - const Signal = struct { - state: Atomic(u32) = Atomic(u32).init(0), + const Signal = struct { + value: Atomic(u32) = Atomic(u32).init(0), - fn wait(self: *Signal) void { - while (true) { - const value = self.value.load(.Acquire); - if (value == 1) break; - assert(value == 0); - Futex.wait(&self.value, 0, null) catch unreachable; - } + fn wait(self: *@This()) void { + while (true) { + const value = self.value.load(.Acquire); + if (value == 1) break; + assert(value == 0); + Futex.wait(&self.value, 0, null) catch unreachable; } + } - fn notify(self: *Signal) void { - assert(self.value.load(.Unordered) == 0); - self.value.store(1, .Release); - Futex.wake(&self.value, 1); - } - }; + fn notify(self: *@This()) void { + assert(self.value.load(.Unordered) == 0); + self.value.store(1, .Release); + Futex.wake(&self.value, 1); + } + }; - const Self = @This(); + const Context = struct { + completed: Signal = .{}, + threads: [4]struct { + thread: std.Thread, + signal: Signal, + } = undefined, - fn runThread(self: *Self, index: usize) void { + fn run(self: *@This(), index: usize) void { const this_signal = &self.threads[index].signal; var next_signal = &self.completed; @@ -538,21 +531,18 @@ test "Futex - Chain" { this_signal.wait(); next_signal.notify(); } + }; - fn run() !void { - var self = Self{}; - - for (self.threads) |*entry, index| { - entry.signal = .{}; - entry.thread = try std.Thread.spawn(.{}, runThread, .{&self, index}); - } + var ctx = Context{}; + for (ctx.threads) |*entry, index| { + entry.signal = .{}; + entry.thread = try std.Thread.spawn(.{}, Context.run, .{&ctx, index}); + } - self.threads[0].signal.notify(); - self.completed.wait(); + ctx.threads[0].signal.notify(); + ctx.completed.wait(); - for (self.threads) |entry| { - entry.thread.join(); - } - } - }).run(); + for (ctx.threads) |entry| { + entry.thread.join(); + } } -- cgit v1.2.3 From 7b323f84ca876c86bbe06f132d5a5d3775def3a2 Mon Sep 17 00:00:00 2001 From: kprotty Date: Sat, 26 Jun 2021 13:00:54 -0500 Subject: std.Thread: more fixes --- lib/std/Thread.zig | 40 ++++++++++++++++++++++++++++++++++++++-- lib/std/Thread/Futex.zig | 25 ++++++++++++++++++++----- 2 files changed, 58 insertions(+), 7 deletions(-) (limited to 'lib/std/Thread') diff --git a/lib/std/Thread.zig b/lib/std/Thread.zig index db049b5d21..cbdcf00e9f 100644 --- a/lib/std/Thread.zig +++ b/lib/std/Thread.zig @@ -45,7 +45,7 @@ else if (use_pthreads) else if (target.os.tag == .linux) LinuxThreadImpl else - @compileLog("Unsupported operating system", target.os.tag); + UnsupportedImpl; impl: Impl, @@ -200,6 +200,40 @@ fn callFn(comptime f: anytype, args: anytype) switch (Impl) { } } +const UnsupportedImpl = struct { + pub const ThreadHandle = void; + + fn getCurrentId() u64 { + return unsupported({}); + } + + fn getCpuCount() !usize { + return unsupported({}); + } + + fn spawn(config: SpawnConfig, comptime f: anytype, args: anytype) !Impl { + return unsupported(.{config, f, args}); + } + + fn getHandle(self: Impl) ThreadHandle { + return unsupported(self); + } + + fn detach(self: Impl) void { + return unsupported(self); + } + + fn join(self: Impl) void { + return unsupported(self); + } + + fn unsupported(unusued: anytype) noreturn { + @compileLog("Unsupported operating system", target.os.tag); + _ = unusued; + unreachable; + } +}; + const WindowsThreadImpl = struct { const windows = os.windows; @@ -725,7 +759,9 @@ const LinuxThreadImpl = struct { \\ li a7, 93 \\ ecall ), - else => @compileError("Platform not supported"), + else => |cpu_arch| { + @compileLog("linux arch", cpu_arch, "is not supported"); + }, }); } } diff --git a/lib/std/Thread/Futex.zig b/lib/std/Thread/Futex.zig index b1b2128caa..de7dd5e73b 100644 --- a/lib/std/Thread/Futex.zig +++ b/lib/std/Thread/Futex.zig @@ -64,10 +64,9 @@ pub fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{TimedOut} /// Unblocks at most `num_waiters` callers blocked in a `wait()` call on `ptr`. /// `num_waiters` of 1 unblocks at most one `wait(ptr, ...)` and `maxInt(u32)` unblocks effectively all `wait(ptr, ...)`. pub fn wake(ptr: *const Atomic(u32), num_waiters: u32) void { - if (num_waiters == 0 or single_threaded) { - return; - } - + if (single_threaded) return; + if (num_waiters == 0) return; + return OsFutex.wake(ptr, num_waiters); } @@ -80,7 +79,23 @@ else if (target.isDarwin()) else if (std.builtin.link_libc) PosixFutex else - @compileError("Operating System unsupported"); + UnsupportedFutex; + +const UnsupportedFutex = struct { + fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{TimedOut}!void { + return unsupported(.{ptr, expect, timeout}); + } + + fn wake(ptr: *const Atomic(u32), num_waiters: u32) void { + return unsupported(.{ptr, num_waiters}); + } + + fn unsupported(unused: anytype) noreturn { + @compileLog("Unsupported operating system", target.os.tag); + _ = unused; + unreachable; + } +}; const WindowsFutex = struct { const windows = std.os.windows; -- cgit v1.2.3 From f0fa129e9b1cdbd90b231da14c6cd99c9413aa98 Mon Sep 17 00:00:00 2001 From: kprotty Date: Mon, 28 Jun 2021 11:27:23 -0500 Subject: std.Thread: more cleanup & testing --- doc/langref.html.in | 12 +- lib/std/Thread.zig | 300 +++++++++++++++++++++++++++-------------------- lib/std/Thread/Futex.zig | 6 +- lib/std/c.zig | 1 + lib/std/os/test.zig | 11 +- 5 files changed, 184 insertions(+), 146 deletions(-) (limited to 'lib/std/Thread') diff --git a/doc/langref.html.in b/doc/langref.html.in index 667b4cd2a7..22965f667e 100644 --- a/doc/langref.html.in +++ b/doc/langref.html.in @@ -958,14 +958,14 @@ const assert = std.debug.assert; threadlocal var x: i32 = 1234; test "thread local storage" { - const thread1 = try std.Thread.spawn(testTls, {}); - const thread2 = try std.Thread.spawn(testTls, {}); - testTls({}); - thread1.wait(); - thread2.wait(); + const thread1 = try std.Thread.spawn(.{}, testTls, .{}); + const thread2 = try std.Thread.spawn(.{}, testTls, .{}); + testTls(); + thread1.join(); + thread2.join(); } -fn testTls(_: void) void { +fn testTls() void { assert(x == 1234); x += 1; assert(x == 1235); diff --git a/lib/std/Thread.zig b/lib/std/Thread.zig index cbdcf00e9f..73678714d0 100644 --- a/lib/std/Thread.zig +++ b/lib/std/Thread.zig @@ -24,17 +24,6 @@ pub const Condition = @import("Thread/Condition.zig"); pub const spinLoopHint = @compileError("deprecated: use std.atomic.spinLoopHint"); -test "std.Thread" { - // Doesn't use testing.refAllDecls() since that would pull in the compileError spinLoopHint. - _ = AutoResetEvent; - _ = Futex; - _ = ResetEvent; - _ = StaticResetEvent; - _ = Mutex; - _ = Semaphore; - _ = Condition; -} - pub const use_pthreads = target.os.tag != .windows and std.builtin.link_libc; const Thread = @This(); @@ -50,7 +39,6 @@ else impl: Impl, /// Represents a unique ID per thread. -/// May be an integer or pointer depending on the platform. pub const Id = u64; /// Returns the platform ID of the callers thread. @@ -79,7 +67,7 @@ pub const SpawnConfig = struct { stack_size: usize = 16 * 1024 * 1024, }; -pub const SpawnError = error { +pub const SpawnError = error{ /// A system-imposed limit on the number of threads was encountered. /// There are a number of limits that may trigger this error: /// * the RLIMIT_NPROC soft resource limit (set via setrlimit(2)), @@ -115,7 +103,7 @@ pub const SpawnError = error { /// or call `detach()` to excuse the caller from calling `join()` and have the thread clean up its resources on completion`. pub fn spawn(config: SpawnConfig, comptime function: anytype, args: anytype) SpawnError!Thread { if (std.builtin.single_threaded) { - @compileError("cannot spawn thread when building in single-threaded mode"); + @compileError("Cannot spawn thread when building in single-threaded mode"); } const impl = try Impl.spawn(config, function, args); @@ -132,11 +120,13 @@ pub fn getHandle(self: Thread) Handle { } /// Release the obligation of the caller to call `join()` and have the thread clean up its own resources on completion. +/// Once called, this consumes the Thread object and invoking any other functions on it is considered undefined behavior. pub fn detach(self: Thread) void { return self.impl.detach(); } /// Waits for the thread to complete, then deallocates any resources created on `spawn()`. +/// Once called, this consumes the Thread object and invoking any other functions on it is considered undefined behavior. pub fn join(self: Thread) void { return self.impl.join(); } @@ -200,6 +190,8 @@ fn callFn(comptime f: anytype, args: anytype) switch (Impl) { } } +/// We can't compile error in the `Impl` switch statement as its eagerly evaluated. +/// So instead, we compile-error on the methods themselves for platforms which don't support threads. const UnsupportedImpl = struct { pub const ThreadHandle = void; @@ -212,7 +204,7 @@ const UnsupportedImpl = struct { } fn spawn(config: SpawnConfig, comptime f: anytype, args: anytype) !Impl { - return unsupported(.{config, f, args}); + return unsupported(.{ config, f, args }); } fn getHandle(self: Impl) ThreadHandle { @@ -225,7 +217,7 @@ const UnsupportedImpl = struct { fn join(self: Impl) void { return unsupported(self); - } + } fn unsupported(unusued: anytype) noreturn { @compileLog("Unsupported operating system", target.os.tag); @@ -244,6 +236,7 @@ const WindowsThreadImpl = struct { } fn getCpuCount() !usize { + // Faster than calling into GetSystemInfo(), even if amortized. return windows.peb().NumberOfProcessors; } @@ -299,16 +292,17 @@ const WindowsThreadImpl = struct { // Its also fine if the limit here is incorrect as stack size is only a hint. var stack_size = std.math.cast(u32, config.stack_size) catch std.math.maxInt(u32); stack_size = std.math.max(64 * 1024, stack_size); - + instance.thread.thread_handle = windows.kernel32.CreateThread( - null, - stack_size, - Instance.entryFn, - @ptrCast(*c_void, instance), - 0, + null, + stack_size, + Instance.entryFn, + @ptrCast(*c_void, instance), + 0, null, ) orelse { - return windows.unexpectedError(windows.kernel32.GetLastError()); + const errno = windows.kernel32.GetLastError(); + return windows.unexpectedError(errno); }; return Impl{ .thread = &instance.thread }; @@ -332,7 +326,7 @@ const WindowsThreadImpl = struct { windows.CloseHandle(self.thread.thread_handle); assert(self.thread.completion.load(.SeqCst) == .completed); self.thread.free(); - } + } }; const PosixThreadImpl = struct { @@ -374,7 +368,9 @@ const PosixThreadImpl = struct { fn getCpuCount() !usize { switch (target.os.tag) { - .linux => return LinuxThreadImpl.getCpuCount(), + .linux => { + return LinuxThreadImpl.getCpuCount(); + }, .openbsd => { var count: c_int = undefined; var count_size: usize = @sizeOf(c_int); @@ -413,6 +409,7 @@ const PosixThreadImpl = struct { const Instance = struct { fn entryFn(raw_arg: ?*c_void) callconv(.C) ?*c_void { + // @alignCast() below doesn't support zero-sized-types (ZST) if (@sizeOf(Args) < 1) { return callFn(f, @as(Args, undefined)); } @@ -457,8 +454,9 @@ const PosixThreadImpl = struct { fn detach(self: Impl) void { switch (c.pthread_detach(self.handle)) { - os.EINVAL => unreachable, - os.ESRCH => unreachable, + 0 => {}, + os.EINVAL => unreachable, // thread handle is not joinable + os.ESRCH => unreachable, // thread handle is invalid else => unreachable, } } @@ -466,9 +464,9 @@ const PosixThreadImpl = struct { fn join(self: Impl) void { switch (c.pthread_join(self.handle, null)) { 0 => {}, - os.EINVAL => unreachable, - os.ESRCH => unreachable, - os.EDEADLK => unreachable, + os.EINVAL => unreachable, // thread handle is not joinable (or another thread is already joining in) + os.ESRCH => unreachable, // thread handle is invalid + os.EDEADLK => unreachable, // two threads tried to join each other else => unreachable, } } @@ -476,7 +474,7 @@ const PosixThreadImpl = struct { const LinuxThreadImpl = struct { const linux = os.linux; - + pub const ThreadHandle = i32; threadlocal var tls_thread_id: ?Id = null; @@ -491,7 +489,8 @@ const LinuxThreadImpl = struct { fn getCpuCount() !usize { const cpu_set = try os.sched_getaffinity(0); - return @as(usize, os.CPU_COUNT(cpu_set)); // TODO should not need this usize cast + // TODO: should not need this usize cast + return @as(usize, os.CPU_COUNT(cpu_set)); } thread: *ThreadCompletion, @@ -547,7 +546,7 @@ const LinuxThreadImpl = struct { bytes = std.mem.alignForward(bytes, std.mem.page_size); break :blk bytes; }; - + // map all memory needed without read/write permissions // to avoid committing the whole region right away const mapped = os.mmap( @@ -654,7 +653,7 @@ const LinuxThreadImpl = struct { switch (linux.getErrno(linux.futex_wait( &self.thread.child_tid.value, - linux.FUTEX_WAIT, + linux.FUTEX_WAIT, tid, null, ))) { @@ -671,98 +670,145 @@ const LinuxThreadImpl = struct { extern fn __unmap_and_exit(ptr: usize, len: usize) callconv(.C) noreturn; comptime { if (target.os.tag == .linux) { - asm(switch (target.cpu.arch) { - .i386 => ( - \\.text - \\.global __unmap_and_exit - \\.type __unmap_and_exit, @function - \\__unmap_and_exit: - \\ movl $91, %eax - \\ movl 4(%esp), %ebx - \\ movl 8(%esp), %ecx - \\ int $128 - \\ xorl %ebx, %ebx - \\ movl $1, %eax - \\ int $128 - ), - .x86_64 => ( - \\.text - \\.global __unmap_and_exit - \\.type __unmap_and_exit, @function - \\__unmap_and_exit: - \\ movl $11, %eax - \\ syscall - \\ xor %rdi, %rdi - \\ movl $60, %eax - \\ syscall - ), - .arm, .armeb, .thumb, .thumbeb => ( - \\.syntax unified - \\.text - \\.global __unmap_and_exit - \\.type __unmap_and_exit, %function - \\__unmap_and_exit: - \\ mov r7, #91 - \\ svc 0 - \\ mov r7, #1 - \\ svc 0 - ), - .aarch64, .aarch64_be, .aarch64_32 => ( - \\.global __unmap_and_exit - \\.type __unmap_and_exit, %function - \\__unmap_and_exit: - \\ mov x8, #215 - \\ svc 0 - \\ mov x8, #93 - \\ svc 0 - ), - .mips, .mipsel, => ( - \\.set noreorder - \\.global __unmap_and_exit - \\.type __unmap_and_exit,@function - \\__unmap_and_exit: - \\ move $sp, $25 - \\ li $2, 4091 - \\ syscall - \\ li $4, 0 - \\ li $2, 4001 - \\ syscall - ), - .mips64, .mips64el => ( - \\.set noreorder - \\.global __unmap_and_exit - \\.type __unmap_and_exit, @function - \\__unmap_and_exit: - \\ li $2, 4091 - \\ syscall - \\ li $4, 0 - \\ li $2, 4001 - \\ syscall - ), - .powerpc, .powerpc64, .powerpc64le => ( - \\.text - \\.global __unmap_and_exit - \\.type __unmap_and_exit, %function - \\__unmap_and_exit: - \\ li 0, 91 - \\ sc - \\ li 0, 1 - \\ sc - \\ blr - ), - .riscv64 => ( - \\.global __unmap_and_exit - \\.type __unmap_and_exit, %function - \\__unmap_and_exit: - \\ li a7, 215 - \\ ecall - \\ li a7, 93 - \\ ecall - ), - else => |cpu_arch| { - @compileLog("linux arch", cpu_arch, "is not supported"); - }, - }); + asm (switch (target.cpu.arch) { + .i386 => ( + \\.text + \\.global __unmap_and_exit + \\.type __unmap_and_exit, @function + \\__unmap_and_exit: + \\ movl $91, %eax + \\ movl 4(%esp), %ebx + \\ movl 8(%esp), %ecx + \\ int $128 + \\ xorl %ebx, %ebx + \\ movl $1, %eax + \\ int $128 + ), + .x86_64 => ( + \\.text + \\.global __unmap_and_exit + \\.type __unmap_and_exit, @function + \\__unmap_and_exit: + \\ movl $11, %eax + \\ syscall + \\ xor %rdi, %rdi + \\ movl $60, %eax + \\ syscall + ), + .arm, .armeb, .thumb, .thumbeb => ( + \\.syntax unified + \\.text + \\.global __unmap_and_exit + \\.type __unmap_and_exit, %function + \\__unmap_and_exit: + \\ mov r7, #91 + \\ svc 0 + \\ mov r7, #1 + \\ svc 0 + ), + .aarch64, .aarch64_be, .aarch64_32 => ( + \\.global __unmap_and_exit + \\.type __unmap_and_exit, %function + \\__unmap_and_exit: + \\ mov x8, #215 + \\ svc 0 + \\ mov x8, #93 + \\ svc 0 + ), + .mips, + .mipsel, + => ( + \\.set noreorder + \\.global __unmap_and_exit + \\.type __unmap_and_exit,@function + \\__unmap_and_exit: + \\ move $sp, $25 + \\ li $2, 4091 + \\ syscall + \\ li $4, 0 + \\ li $2, 4001 + \\ syscall + ), + .mips64, .mips64el => ( + \\.set noreorder + \\.global __unmap_and_exit + \\.type __unmap_and_exit, @function + \\__unmap_and_exit: + \\ li $2, 4091 + \\ syscall + \\ li $4, 0 + \\ li $2, 4001 + \\ syscall + ), + .powerpc, .powerpc64, .powerpc64le => ( + \\.text + \\.global __unmap_and_exit + \\.type __unmap_and_exit, %function + \\__unmap_and_exit: + \\ li 0, 91 + \\ sc + \\ li 0, 1 + \\ sc + \\ blr + ), + .riscv64 => ( + \\.global __unmap_and_exit + \\.type __unmap_and_exit, %function + \\__unmap_and_exit: + \\ li a7, 215 + \\ ecall + \\ li a7, 93 + \\ ecall + ), + else => |cpu_arch| { + @compileLog("linux arch", cpu_arch, "is not supported"); + }, + }); } } -}; \ No newline at end of file +}; + +test "std.Thread" { + // Doesn't use testing.refAllDecls() since that would pull in the compileError spinLoopHint. + _ = AutoResetEvent; + _ = Futex; + _ = ResetEvent; + _ = StaticResetEvent; + _ = Mutex; + _ = Semaphore; + _ = Condition; +} + +fn testIncrementNotify(value: *usize, event: *ResetEvent) void { + value.* += 1; + event.set(); +} + +test "Thread.join" { + if (std.builtin.single_threaded) return error.SkipZigTest; + + var value: usize = 0; + var event: ResetEvent = undefined; + try event.init(); + defer event.deinit(); + + const thread = try Thread.spawn(.{}, testIncrementNotify, .{&value, &event}); + thread.join(); + + try std.testing.expectEqual(value, 1); +} + +test "Thread.detach" { + if (std.builtin.single_threaded) return error.SkipZigTest; + + var value: usize = 0; + var event: ResetEvent = undefined; + try event.init(); + defer event.deinit(); + + const thread = try Thread.spawn(.{}, testIncrementNotify, .{&value, &event}); + thread.detach(); + + event.wait(); + try std.testing.expectEqual(value, 1); +} \ No newline at end of file diff --git a/lib/std/Thread/Futex.zig b/lib/std/Thread/Futex.zig index de7dd5e73b..4725deb2e3 100644 --- a/lib/std/Thread/Futex.zig +++ b/lib/std/Thread/Futex.zig @@ -407,7 +407,7 @@ test "Futex - wait/wake" { test "Futex - Signal" { if (single_threaded) { - return; + return error.SkipZigTest; } const Paddle = struct { @@ -449,7 +449,7 @@ test "Futex - Signal" { test "Futex - Broadcast" { if (single_threaded) { - return; + return error.SkipZigTest; } const Context = struct { @@ -506,7 +506,7 @@ test "Futex - Broadcast" { test "Futex - Chain" { if (single_threaded) { - return; + return error.SkipZigTest; } const Signal = struct { diff --git a/lib/std/c.zig b/lib/std/c.zig index 2a8915bd1b..a3fe814a71 100644 --- a/lib/std/c.zig +++ b/lib/std/c.zig @@ -277,6 +277,7 @@ pub extern "c" fn pthread_attr_setguardsize(attr: *pthread_attr_t, guardsize: us pub extern "c" fn pthread_attr_destroy(attr: *pthread_attr_t) c_int; pub extern "c" fn pthread_self() pthread_t; pub extern "c" fn pthread_join(thread: pthread_t, arg_return: ?*?*c_void) c_int; +pub extern "c" fn pthread_detach(thread: pthread_t) c_int; pub extern "c" fn pthread_atfork( prepare: ?fn () callconv(.C) void, parent: ?fn () callconv(.C) void, diff --git a/lib/std/os/test.zig b/lib/std/os/test.zig index 750c63d447..6184c97706 100644 --- a/lib/std/os/test.zig +++ b/lib/std/os/test.zig @@ -321,17 +321,8 @@ test "std.Thread.getCurrentId" { var thread_current_id: Thread.Id = undefined; const thread = try Thread.spawn(.{}, testThreadIdFn, .{&thread_current_id}); - const thread_id = thread.getHandle(); thread.join(); - if (Thread.use_pthreads) { - try expect(thread_current_id == thread_id); - } else if (native_os == .windows) { - try expect(Thread.getCurrentId() != thread_current_id); - } else { - // If the thread completes very quickly, then thread_id can be 0. See the - // documentation comments for `std.Thread.handle`. - try expect(thread_id == 0 or thread_current_id == thread_id); - } + try expect(Thread.getCurrentId() != thread_current_id); } test "spawn threads" { -- cgit v1.2.3 From 98106b09d5bf1bcc1be4fe09a7fa645b3b343732 Mon Sep 17 00:00:00 2001 From: kprotty Date: Wed, 30 Jun 2021 21:49:38 -0500 Subject: zig fmt --- lib/std/Thread.zig | 6 +++--- lib/std/Thread/Futex.zig | 17 ++++++++--------- tools/update_cpu_features.zig | 2 +- 3 files changed, 12 insertions(+), 13 deletions(-) (limited to 'lib/std/Thread') diff --git a/lib/std/Thread.zig b/lib/std/Thread.zig index 0e82158889..8f245c257c 100644 --- a/lib/std/Thread.zig +++ b/lib/std/Thread.zig @@ -792,7 +792,7 @@ test "Thread.join" { try event.init(); defer event.deinit(); - const thread = try Thread.spawn(.{}, testIncrementNotify, .{&value, &event}); + const thread = try Thread.spawn(.{}, testIncrementNotify, .{ &value, &event }); thread.join(); try std.testing.expectEqual(value, 1); @@ -806,9 +806,9 @@ test "Thread.detach" { try event.init(); defer event.deinit(); - const thread = try Thread.spawn(.{}, testIncrementNotify, .{&value, &event}); + const thread = try Thread.spawn(.{}, testIncrementNotify, .{ &value, &event }); thread.detach(); event.wait(); try std.testing.expectEqual(value, 1); -} \ No newline at end of file +} diff --git a/lib/std/Thread/Futex.zig b/lib/std/Thread/Futex.zig index 4725deb2e3..2a18711231 100644 --- a/lib/std/Thread/Futex.zig +++ b/lib/std/Thread/Futex.zig @@ -66,7 +66,7 @@ pub fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{TimedOut} pub fn wake(ptr: *const Atomic(u32), num_waiters: u32) void { if (single_threaded) return; if (num_waiters == 0) return; - + return OsFutex.wake(ptr, num_waiters); } @@ -83,11 +83,11 @@ else const UnsupportedFutex = struct { fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{TimedOut}!void { - return unsupported(.{ptr, expect, timeout}); + return unsupported(.{ ptr, expect, timeout }); } fn wake(ptr: *const Atomic(u32), num_waiters: u32) void { - return unsupported(.{ptr, num_waiters}); + return unsupported(.{ ptr, num_waiters }); } fn unsupported(unused: anytype) noreturn { @@ -417,7 +417,6 @@ test "Futex - Signal" { fn run(self: *@This(), hit_to: *@This()) !void { var iterations: usize = 4; while (iterations > 0) : (iterations -= 1) { - var value: u32 = undefined; while (true) { value = self.value.load(.Acquire); @@ -427,7 +426,7 @@ test "Futex - Signal" { try testing.expectEqual(value, self.current + 1); self.current = value; - + _ = hit_to.value.fetchAdd(1, .Release); Futex.wake(&hit_to.value, 1); } @@ -437,10 +436,10 @@ test "Futex - Signal" { var ping = Paddle{}; var pong = Paddle{}; - const t1 = try std.Thread.spawn(.{}, Paddle.run, .{&ping, &pong}); + const t1 = try std.Thread.spawn(.{}, Paddle.run, .{ &ping, &pong }); defer t1.join(); - const t2 = try std.Thread.spawn(.{}, Paddle.run, .{&pong, &ping}); + const t2 = try std.Thread.spawn(.{}, Paddle.run, .{ &pong, &ping }); defer t2.join(); _ = ping.value.fetchAdd(1, .Release); @@ -497,7 +496,7 @@ test "Futex - Broadcast" { // Try to wait for the threads to start before running runSender(). // NOTE: not actually needed for correctness. - std.time.sleep(16 * std.time.ns_per_ms); + std.time.sleep(16 * std.time.ns_per_ms); try ctx.runSender(); const notified = ctx.notified.load(.Monotonic); @@ -551,7 +550,7 @@ test "Futex - Chain" { var ctx = Context{}; for (ctx.threads) |*entry, index| { entry.signal = .{}; - entry.thread = try std.Thread.spawn(.{}, Context.run, .{&ctx, index}); + entry.thread = try std.Thread.spawn(.{}, Context.run, .{ &ctx, index }); } ctx.threads[0].signal.notify(); diff --git a/tools/update_cpu_features.zig b/tools/update_cpu_features.zig index 4694e3de25..bdc82fa22e 100644 --- a/tools/update_cpu_features.zig +++ b/tools/update_cpu_features.zig @@ -819,7 +819,7 @@ pub fn main() anyerror!void { var threads = try arena.alloc(std.Thread, llvm_targets.len); for (llvm_targets) |llvm_target, i| { threads[i] = try std.Thread.spawn(.{}, processOneTarget, .{ - Job{ + Job{ .llvm_tblgen_exe = llvm_tblgen_exe, .llvm_src_root = llvm_src_root, .zig_src_dir = zig_src_dir, -- cgit v1.2.3