diff options
| author | Andrew Kelley <andrew@ziglang.org> | 2021-07-04 22:31:02 -0400 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2021-07-04 22:31:02 -0400 |
| commit | b7da1b2d45bc42a56eea3a143e4237a0712c4769 (patch) | |
| tree | 5474938657d5dfd9273562c160ad5f1e3a02b824 /lib/std/Thread | |
| parent | 5d0dad9acdac854d68e1447b90fd3dbde9ff0b2d (diff) | |
| parent | c8f90a7e7e10be62634454bf124bef3c6130a0db (diff) | |
| download | zig-b7da1b2d45bc42a56eea3a143e4237a0712c4769.tar.gz zig-b7da1b2d45bc42a56eea3a143e4237a0712c4769.zip | |
Merge pull request #9175 from kprotty/thread
std.Thread enhancements
Diffstat (limited to 'lib/std/Thread')
| -rw-r--r-- | lib/std/Thread/AutoResetEvent.zig | 8 | ||||
| -rw-r--r-- | lib/std/Thread/Futex.zig | 240 | ||||
| -rw-r--r-- | lib/std/Thread/Mutex.zig | 6 | ||||
| -rw-r--r-- | lib/std/Thread/ResetEvent.zig | 8 | ||||
| -rw-r--r-- | lib/std/Thread/StaticResetEvent.zig | 8 |
5 files changed, 131 insertions, 139 deletions
diff --git a/lib/std/Thread/AutoResetEvent.zig b/lib/std/Thread/AutoResetEvent.zig index 7372a8d8d9..13e404d602 100644 --- a/lib/std/Thread/AutoResetEvent.zig +++ b/lib/std/Thread/AutoResetEvent.zig @@ -220,9 +220,9 @@ test "basic usage" { }; var context = Context{}; - const send_thread = try std.Thread.spawn(Context.sender, &context); - const recv_thread = try std.Thread.spawn(Context.receiver, &context); + const send_thread = try std.Thread.spawn(.{}, Context.sender, .{&context}); + const recv_thread = try std.Thread.spawn(.{}, Context.receiver, .{&context}); - send_thread.wait(); - recv_thread.wait(); + send_thread.join(); + recv_thread.join(); } diff --git a/lib/std/Thread/Futex.zig b/lib/std/Thread/Futex.zig index 356a4e2046..2a18711231 100644 --- a/lib/std/Thread/Futex.zig +++ b/lib/std/Thread/Futex.zig @@ -64,9 +64,8 @@ pub fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{TimedOut} /// Unblocks at most `num_waiters` callers blocked in a `wait()` call on `ptr`. /// `num_waiters` of 1 unblocks at most one `wait(ptr, ...)` and `maxInt(u32)` unblocks effectively all `wait(ptr, ...)`. pub fn wake(ptr: *const Atomic(u32), num_waiters: u32) void { - if (num_waiters == 0 or single_threaded) { - return; - } + if (single_threaded) return; + if (num_waiters == 0) return; return OsFutex.wake(ptr, num_waiters); } @@ -80,7 +79,23 @@ else if (target.isDarwin()) else if (std.builtin.link_libc) PosixFutex else - @compileError("Operating System unsupported"); + UnsupportedFutex; + +const UnsupportedFutex = struct { + fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{TimedOut}!void { + return unsupported(.{ ptr, expect, timeout }); + } + + fn wake(ptr: *const Atomic(u32), num_waiters: u32) void { + return unsupported(.{ ptr, num_waiters }); + } + + fn unsupported(unused: anytype) noreturn { + @compileLog("Unsupported operating system", target.os.tag); + _ = unused; + unreachable; + } +}; const WindowsFutex = struct { const windows = std.os.windows; @@ -391,75 +406,73 @@ test "Futex - wait/wake" { } test "Futex - Signal" { - if (!single_threaded) { - return; + if (single_threaded) { + return error.SkipZigTest; } - try (struct { + const Paddle = struct { value: Atomic(u32) = Atomic(u32).init(0), + current: u32 = 0, - const Self = @This(); + fn run(self: *@This(), hit_to: *@This()) !void { + var iterations: usize = 4; + while (iterations > 0) : (iterations -= 1) { + var value: u32 = undefined; + while (true) { + value = self.value.load(.Acquire); + if (value != self.current) break; + Futex.wait(&self.value, self.current, null) catch unreachable; + } - fn send(self: *Self, value: u32) void { - self.value.store(value, .Release); - Futex.wake(&self.value, 1); - } + try testing.expectEqual(value, self.current + 1); + self.current = value; - fn recv(self: *Self, expected: u32) void { - while (true) { - const value = self.value.load(.Acquire); - if (value == expected) break; - Futex.wait(&self.value, value, null) catch unreachable; + _ = hit_to.value.fetchAdd(1, .Release); + Futex.wake(&hit_to.value, 1); } } + }; - const Thread = struct { - tx: *Self, - rx: *Self, - - const start_value = 1; - - fn run(self: Thread) void { - var iterations: u32 = start_value; - while (iterations < 10) : (iterations += 1) { - self.rx.recv(iterations); - self.tx.send(iterations); - } - } - }; - - fn run() !void { - var ping = Self{}; - var pong = Self{}; + var ping = Paddle{}; + var pong = Paddle{}; - const t1 = try std.Thread.spawn(Thread.run, .{ .rx = &ping, .tx = &pong }); - defer t1.wait(); + const t1 = try std.Thread.spawn(.{}, Paddle.run, .{ &ping, &pong }); + defer t1.join(); - const t2 = try std.Thread.spawn(Thread.run, .{ .rx = &pong, .tx = &ping }); - defer t2.wait(); + const t2 = try std.Thread.spawn(.{}, Paddle.run, .{ &pong, &ping }); + defer t2.join(); - ping.send(Thread.start_value); - } - }).run(); + _ = ping.value.fetchAdd(1, .Release); + Futex.wake(&ping.value, 1); } test "Futex - Broadcast" { - if (!single_threaded) { - return; + if (single_threaded) { + return error.SkipZigTest; } - try (struct { - threads: [10]*std.Thread = undefined, + const Context = struct { + threads: [4]std.Thread = undefined, broadcast: Atomic(u32) = Atomic(u32).init(0), notified: Atomic(usize) = Atomic(usize).init(0), - const Self = @This(); - const BROADCAST_EMPTY = 0; const BROADCAST_SENT = 1; const BROADCAST_RECEIVED = 2; - fn runReceiver(self: *Self) void { + fn runSender(self: *@This()) !void { + self.broadcast.store(BROADCAST_SENT, .Monotonic); + Futex.wake(&self.broadcast, @intCast(u32, self.threads.len)); + + while (true) { + const broadcast = self.broadcast.load(.Acquire); + if (broadcast == BROADCAST_RECEIVED) break; + try testing.expectEqual(broadcast, BROADCAST_SENT); + Futex.wait(&self.broadcast, broadcast, null) catch unreachable; + } + } + + fn runReceiver(self: *@This()) void { while (true) { const broadcast = self.broadcast.load(.Acquire); if (broadcast == BROADCAST_SENT) break; @@ -473,98 +486,77 @@ test "Futex - Broadcast" { Futex.wake(&self.broadcast, 1); } } + }; - fn run() !void { - var self = Self{}; - - for (self.threads) |*thread| - thread.* = try std.Thread.spawn(runReceiver, &self); - defer for (self.threads) |thread| - thread.wait(); + var ctx = Context{}; + for (ctx.threads) |*thread| + thread.* = try std.Thread.spawn(.{}, Context.runReceiver, .{&ctx}); + defer for (ctx.threads) |thread| + thread.join(); - std.time.sleep(16 * std.time.ns_per_ms); - self.broadcast.store(BROADCAST_SENT, .Monotonic); - Futex.wake(&self.broadcast, @intCast(u32, self.threads.len)); + // Try to wait for the threads to start before running runSender(). + // NOTE: not actually needed for correctness. + std.time.sleep(16 * std.time.ns_per_ms); + try ctx.runSender(); - while (true) { - const broadcast = self.broadcast.load(.Acquire); - if (broadcast == BROADCAST_RECEIVED) break; - try testing.expectEqual(broadcast, BROADCAST_SENT); - Futex.wait(&self.broadcast, broadcast, null) catch unreachable; - } - - const notified = self.notified.load(.Monotonic); - try testing.expectEqual(notified, self.threads.len); - } - }).run(); + const notified = ctx.notified.load(.Monotonic); + try testing.expectEqual(notified, ctx.threads.len); } test "Futex - Chain" { - if (!single_threaded) { - return; + if (single_threaded) { + return error.SkipZigTest; } - try (struct { - completed: Signal = .{}, - threads: [10]struct { - thread: *std.Thread, - signal: Signal, - } = undefined, - - const Signal = struct { - state: Atomic(u32) = Atomic(u32).init(0), - - fn wait(self: *Signal) void { - while (true) { - const value = self.value.load(.Acquire); - if (value == 1) break; - assert(value == 0); - Futex.wait(&self.value, 0, null) catch unreachable; - } - } + const Signal = struct { + value: Atomic(u32) = Atomic(u32).init(0), - fn notify(self: *Signal) void { - assert(self.value.load(.Unordered) == 0); - self.value.store(1, .Release); - Futex.wake(&self.value, 1); + fn wait(self: *@This()) void { + while (true) { + const value = self.value.load(.Acquire); + if (value == 1) break; + assert(value == 0); + Futex.wait(&self.value, 0, null) catch unreachable; } - }; + } - const Self = @This(); - const Chain = struct { - self: *Self, - index: usize, + fn notify(self: *@This()) void { + assert(self.value.load(.Unordered) == 0); + self.value.store(1, .Release); + Futex.wake(&self.value, 1); + } + }; - fn run(chain: Chain) void { - const this_signal = &chain.self.threads[chain.index].signal; + const Context = struct { + completed: Signal = .{}, + threads: [4]struct { + thread: std.Thread, + signal: Signal, + } = undefined, - var next_signal = &chain.self.completed; - if (chain.index + 1 < chain.self.threads.len) { - next_signal = &chain.self.threads[chain.index + 1].signal; - } + fn run(self: *@This(), index: usize) void { + const this_signal = &self.threads[index].signal; - this_signal.wait(); - next_signal.notify(); + var next_signal = &self.completed; + if (index + 1 < self.threads.len) { + next_signal = &self.threads[index + 1].signal; } - }; - fn run() !void { - var self = Self{}; + this_signal.wait(); + next_signal.notify(); + } + }; - for (self.threads) |*entry, index| { - entry.signal = .{}; - entry.thread = try std.Thread.spawn(Chain.run, .{ - .self = &self, - .index = index, - }); - } + var ctx = Context{}; + for (ctx.threads) |*entry, index| { + entry.signal = .{}; + entry.thread = try std.Thread.spawn(.{}, Context.run, .{ &ctx, index }); + } - self.threads[0].signal.notify(); - self.completed.wait(); + ctx.threads[0].signal.notify(); + ctx.completed.wait(); - for (self.threads) |entry| { - entry.thread.wait(); - } - } - }).run(); + for (ctx.threads) |entry| { + entry.thread.join(); + } } diff --git a/lib/std/Thread/Mutex.zig b/lib/std/Thread/Mutex.zig index 49f138079d..35095b2a3c 100644 --- a/lib/std/Thread/Mutex.zig +++ b/lib/std/Thread/Mutex.zig @@ -297,12 +297,12 @@ test "basic usage" { try testing.expect(context.data == TestContext.incr_count); } else { const thread_count = 10; - var threads: [thread_count]*std.Thread = undefined; + var threads: [thread_count]std.Thread = undefined; for (threads) |*t| { - t.* = try std.Thread.spawn(worker, &context); + t.* = try std.Thread.spawn(.{}, worker, .{&context}); } for (threads) |t| - t.wait(); + t.join(); try testing.expect(context.data == thread_count * TestContext.incr_count); } diff --git a/lib/std/Thread/ResetEvent.zig b/lib/std/Thread/ResetEvent.zig index f161b46aa0..356b8eb78d 100644 --- a/lib/std/Thread/ResetEvent.zig +++ b/lib/std/Thread/ResetEvent.zig @@ -281,8 +281,8 @@ test "basic usage" { var context: Context = undefined; try context.init(); defer context.deinit(); - const receiver = try std.Thread.spawn(Context.receiver, &context); - defer receiver.wait(); + const receiver = try std.Thread.spawn(.{}, Context.receiver, .{&context}); + defer receiver.join(); try context.sender(); if (false) { @@ -290,8 +290,8 @@ test "basic usage" { // https://github.com/ziglang/zig/issues/7009 var timed = Context.init(); defer timed.deinit(); - const sleeper = try std.Thread.spawn(Context.sleeper, &timed); - defer sleeper.wait(); + const sleeper = try std.Thread.spawn(.{}, Context.sleeper, .{&timed}); + defer sleeper.join(); try timed.timedWaiter(); } } diff --git a/lib/std/Thread/StaticResetEvent.zig b/lib/std/Thread/StaticResetEvent.zig index 6f869e0d89..40974938d0 100644 --- a/lib/std/Thread/StaticResetEvent.zig +++ b/lib/std/Thread/StaticResetEvent.zig @@ -384,8 +384,8 @@ test "basic usage" { }; var context = Context{}; - const receiver = try std.Thread.spawn(Context.receiver, &context); - defer receiver.wait(); + const receiver = try std.Thread.spawn(.{}, Context.receiver, .{&context}); + defer receiver.join(); try context.sender(); if (false) { @@ -393,8 +393,8 @@ test "basic usage" { // https://github.com/ziglang/zig/issues/7009 var timed = Context.init(); defer timed.deinit(); - const sleeper = try std.Thread.spawn(Context.sleeper, &timed); - defer sleeper.wait(); + const sleeper = try std.Thread.spawn(.{}, Context.sleeper, .{&timed}); + defer sleeper.join(); try timed.timedWaiter(); } } |
