aboutsummaryrefslogtreecommitdiff
path: root/lib/std/Thread
diff options
context:
space:
mode:
authorAndrew Kelley <andrew@ziglang.org>2021-07-04 22:31:02 -0400
committerGitHub <noreply@github.com>2021-07-04 22:31:02 -0400
commitb7da1b2d45bc42a56eea3a143e4237a0712c4769 (patch)
tree5474938657d5dfd9273562c160ad5f1e3a02b824 /lib/std/Thread
parent5d0dad9acdac854d68e1447b90fd3dbde9ff0b2d (diff)
parentc8f90a7e7e10be62634454bf124bef3c6130a0db (diff)
downloadzig-b7da1b2d45bc42a56eea3a143e4237a0712c4769.tar.gz
zig-b7da1b2d45bc42a56eea3a143e4237a0712c4769.zip
Merge pull request #9175 from kprotty/thread
std.Thread enhancements
Diffstat (limited to 'lib/std/Thread')
-rw-r--r--lib/std/Thread/AutoResetEvent.zig8
-rw-r--r--lib/std/Thread/Futex.zig240
-rw-r--r--lib/std/Thread/Mutex.zig6
-rw-r--r--lib/std/Thread/ResetEvent.zig8
-rw-r--r--lib/std/Thread/StaticResetEvent.zig8
5 files changed, 131 insertions, 139 deletions
diff --git a/lib/std/Thread/AutoResetEvent.zig b/lib/std/Thread/AutoResetEvent.zig
index 7372a8d8d9..13e404d602 100644
--- a/lib/std/Thread/AutoResetEvent.zig
+++ b/lib/std/Thread/AutoResetEvent.zig
@@ -220,9 +220,9 @@ test "basic usage" {
};
var context = Context{};
- const send_thread = try std.Thread.spawn(Context.sender, &context);
- const recv_thread = try std.Thread.spawn(Context.receiver, &context);
+ const send_thread = try std.Thread.spawn(.{}, Context.sender, .{&context});
+ const recv_thread = try std.Thread.spawn(.{}, Context.receiver, .{&context});
- send_thread.wait();
- recv_thread.wait();
+ send_thread.join();
+ recv_thread.join();
}
diff --git a/lib/std/Thread/Futex.zig b/lib/std/Thread/Futex.zig
index 356a4e2046..2a18711231 100644
--- a/lib/std/Thread/Futex.zig
+++ b/lib/std/Thread/Futex.zig
@@ -64,9 +64,8 @@ pub fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{TimedOut}
/// Unblocks at most `num_waiters` callers blocked in a `wait()` call on `ptr`.
/// `num_waiters` of 1 unblocks at most one `wait(ptr, ...)` and `maxInt(u32)` unblocks effectively all `wait(ptr, ...)`.
pub fn wake(ptr: *const Atomic(u32), num_waiters: u32) void {
- if (num_waiters == 0 or single_threaded) {
- return;
- }
+ if (single_threaded) return;
+ if (num_waiters == 0) return;
return OsFutex.wake(ptr, num_waiters);
}
@@ -80,7 +79,23 @@ else if (target.isDarwin())
else if (std.builtin.link_libc)
PosixFutex
else
- @compileError("Operating System unsupported");
+ UnsupportedFutex;
+
+const UnsupportedFutex = struct {
+ fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{TimedOut}!void {
+ return unsupported(.{ ptr, expect, timeout });
+ }
+
+ fn wake(ptr: *const Atomic(u32), num_waiters: u32) void {
+ return unsupported(.{ ptr, num_waiters });
+ }
+
+ fn unsupported(unused: anytype) noreturn {
+ @compileLog("Unsupported operating system", target.os.tag);
+ _ = unused;
+ unreachable;
+ }
+};
const WindowsFutex = struct {
const windows = std.os.windows;
@@ -391,75 +406,73 @@ test "Futex - wait/wake" {
}
test "Futex - Signal" {
- if (!single_threaded) {
- return;
+ if (single_threaded) {
+ return error.SkipZigTest;
}
- try (struct {
+ const Paddle = struct {
value: Atomic(u32) = Atomic(u32).init(0),
+ current: u32 = 0,
- const Self = @This();
+ fn run(self: *@This(), hit_to: *@This()) !void {
+ var iterations: usize = 4;
+ while (iterations > 0) : (iterations -= 1) {
+ var value: u32 = undefined;
+ while (true) {
+ value = self.value.load(.Acquire);
+ if (value != self.current) break;
+ Futex.wait(&self.value, self.current, null) catch unreachable;
+ }
- fn send(self: *Self, value: u32) void {
- self.value.store(value, .Release);
- Futex.wake(&self.value, 1);
- }
+ try testing.expectEqual(value, self.current + 1);
+ self.current = value;
- fn recv(self: *Self, expected: u32) void {
- while (true) {
- const value = self.value.load(.Acquire);
- if (value == expected) break;
- Futex.wait(&self.value, value, null) catch unreachable;
+ _ = hit_to.value.fetchAdd(1, .Release);
+ Futex.wake(&hit_to.value, 1);
}
}
+ };
- const Thread = struct {
- tx: *Self,
- rx: *Self,
-
- const start_value = 1;
-
- fn run(self: Thread) void {
- var iterations: u32 = start_value;
- while (iterations < 10) : (iterations += 1) {
- self.rx.recv(iterations);
- self.tx.send(iterations);
- }
- }
- };
-
- fn run() !void {
- var ping = Self{};
- var pong = Self{};
+ var ping = Paddle{};
+ var pong = Paddle{};
- const t1 = try std.Thread.spawn(Thread.run, .{ .rx = &ping, .tx = &pong });
- defer t1.wait();
+ const t1 = try std.Thread.spawn(.{}, Paddle.run, .{ &ping, &pong });
+ defer t1.join();
- const t2 = try std.Thread.spawn(Thread.run, .{ .rx = &pong, .tx = &ping });
- defer t2.wait();
+ const t2 = try std.Thread.spawn(.{}, Paddle.run, .{ &pong, &ping });
+ defer t2.join();
- ping.send(Thread.start_value);
- }
- }).run();
+ _ = ping.value.fetchAdd(1, .Release);
+ Futex.wake(&ping.value, 1);
}
test "Futex - Broadcast" {
- if (!single_threaded) {
- return;
+ if (single_threaded) {
+ return error.SkipZigTest;
}
- try (struct {
- threads: [10]*std.Thread = undefined,
+ const Context = struct {
+ threads: [4]std.Thread = undefined,
broadcast: Atomic(u32) = Atomic(u32).init(0),
notified: Atomic(usize) = Atomic(usize).init(0),
- const Self = @This();
-
const BROADCAST_EMPTY = 0;
const BROADCAST_SENT = 1;
const BROADCAST_RECEIVED = 2;
- fn runReceiver(self: *Self) void {
+ fn runSender(self: *@This()) !void {
+ self.broadcast.store(BROADCAST_SENT, .Monotonic);
+ Futex.wake(&self.broadcast, @intCast(u32, self.threads.len));
+
+ while (true) {
+ const broadcast = self.broadcast.load(.Acquire);
+ if (broadcast == BROADCAST_RECEIVED) break;
+ try testing.expectEqual(broadcast, BROADCAST_SENT);
+ Futex.wait(&self.broadcast, broadcast, null) catch unreachable;
+ }
+ }
+
+ fn runReceiver(self: *@This()) void {
while (true) {
const broadcast = self.broadcast.load(.Acquire);
if (broadcast == BROADCAST_SENT) break;
@@ -473,98 +486,77 @@ test "Futex - Broadcast" {
Futex.wake(&self.broadcast, 1);
}
}
+ };
- fn run() !void {
- var self = Self{};
-
- for (self.threads) |*thread|
- thread.* = try std.Thread.spawn(runReceiver, &self);
- defer for (self.threads) |thread|
- thread.wait();
+ var ctx = Context{};
+ for (ctx.threads) |*thread|
+ thread.* = try std.Thread.spawn(.{}, Context.runReceiver, .{&ctx});
+ defer for (ctx.threads) |thread|
+ thread.join();
- std.time.sleep(16 * std.time.ns_per_ms);
- self.broadcast.store(BROADCAST_SENT, .Monotonic);
- Futex.wake(&self.broadcast, @intCast(u32, self.threads.len));
+ // Try to wait for the threads to start before running runSender().
+ // NOTE: not actually needed for correctness.
+ std.time.sleep(16 * std.time.ns_per_ms);
+ try ctx.runSender();
- while (true) {
- const broadcast = self.broadcast.load(.Acquire);
- if (broadcast == BROADCAST_RECEIVED) break;
- try testing.expectEqual(broadcast, BROADCAST_SENT);
- Futex.wait(&self.broadcast, broadcast, null) catch unreachable;
- }
-
- const notified = self.notified.load(.Monotonic);
- try testing.expectEqual(notified, self.threads.len);
- }
- }).run();
+ const notified = ctx.notified.load(.Monotonic);
+ try testing.expectEqual(notified, ctx.threads.len);
}
test "Futex - Chain" {
- if (!single_threaded) {
- return;
+ if (single_threaded) {
+ return error.SkipZigTest;
}
- try (struct {
- completed: Signal = .{},
- threads: [10]struct {
- thread: *std.Thread,
- signal: Signal,
- } = undefined,
-
- const Signal = struct {
- state: Atomic(u32) = Atomic(u32).init(0),
-
- fn wait(self: *Signal) void {
- while (true) {
- const value = self.value.load(.Acquire);
- if (value == 1) break;
- assert(value == 0);
- Futex.wait(&self.value, 0, null) catch unreachable;
- }
- }
+ const Signal = struct {
+ value: Atomic(u32) = Atomic(u32).init(0),
- fn notify(self: *Signal) void {
- assert(self.value.load(.Unordered) == 0);
- self.value.store(1, .Release);
- Futex.wake(&self.value, 1);
+ fn wait(self: *@This()) void {
+ while (true) {
+ const value = self.value.load(.Acquire);
+ if (value == 1) break;
+ assert(value == 0);
+ Futex.wait(&self.value, 0, null) catch unreachable;
}
- };
+ }
- const Self = @This();
- const Chain = struct {
- self: *Self,
- index: usize,
+ fn notify(self: *@This()) void {
+ assert(self.value.load(.Unordered) == 0);
+ self.value.store(1, .Release);
+ Futex.wake(&self.value, 1);
+ }
+ };
- fn run(chain: Chain) void {
- const this_signal = &chain.self.threads[chain.index].signal;
+ const Context = struct {
+ completed: Signal = .{},
+ threads: [4]struct {
+ thread: std.Thread,
+ signal: Signal,
+ } = undefined,
- var next_signal = &chain.self.completed;
- if (chain.index + 1 < chain.self.threads.len) {
- next_signal = &chain.self.threads[chain.index + 1].signal;
- }
+ fn run(self: *@This(), index: usize) void {
+ const this_signal = &self.threads[index].signal;
- this_signal.wait();
- next_signal.notify();
+ var next_signal = &self.completed;
+ if (index + 1 < self.threads.len) {
+ next_signal = &self.threads[index + 1].signal;
}
- };
- fn run() !void {
- var self = Self{};
+ this_signal.wait();
+ next_signal.notify();
+ }
+ };
- for (self.threads) |*entry, index| {
- entry.signal = .{};
- entry.thread = try std.Thread.spawn(Chain.run, .{
- .self = &self,
- .index = index,
- });
- }
+ var ctx = Context{};
+ for (ctx.threads) |*entry, index| {
+ entry.signal = .{};
+ entry.thread = try std.Thread.spawn(.{}, Context.run, .{ &ctx, index });
+ }
- self.threads[0].signal.notify();
- self.completed.wait();
+ ctx.threads[0].signal.notify();
+ ctx.completed.wait();
- for (self.threads) |entry| {
- entry.thread.wait();
- }
- }
- }).run();
+ for (ctx.threads) |entry| {
+ entry.thread.join();
+ }
}
diff --git a/lib/std/Thread/Mutex.zig b/lib/std/Thread/Mutex.zig
index 49f138079d..35095b2a3c 100644
--- a/lib/std/Thread/Mutex.zig
+++ b/lib/std/Thread/Mutex.zig
@@ -297,12 +297,12 @@ test "basic usage" {
try testing.expect(context.data == TestContext.incr_count);
} else {
const thread_count = 10;
- var threads: [thread_count]*std.Thread = undefined;
+ var threads: [thread_count]std.Thread = undefined;
for (threads) |*t| {
- t.* = try std.Thread.spawn(worker, &context);
+ t.* = try std.Thread.spawn(.{}, worker, .{&context});
}
for (threads) |t|
- t.wait();
+ t.join();
try testing.expect(context.data == thread_count * TestContext.incr_count);
}
diff --git a/lib/std/Thread/ResetEvent.zig b/lib/std/Thread/ResetEvent.zig
index f161b46aa0..356b8eb78d 100644
--- a/lib/std/Thread/ResetEvent.zig
+++ b/lib/std/Thread/ResetEvent.zig
@@ -281,8 +281,8 @@ test "basic usage" {
var context: Context = undefined;
try context.init();
defer context.deinit();
- const receiver = try std.Thread.spawn(Context.receiver, &context);
- defer receiver.wait();
+ const receiver = try std.Thread.spawn(.{}, Context.receiver, .{&context});
+ defer receiver.join();
try context.sender();
if (false) {
@@ -290,8 +290,8 @@ test "basic usage" {
// https://github.com/ziglang/zig/issues/7009
var timed = Context.init();
defer timed.deinit();
- const sleeper = try std.Thread.spawn(Context.sleeper, &timed);
- defer sleeper.wait();
+ const sleeper = try std.Thread.spawn(.{}, Context.sleeper, .{&timed});
+ defer sleeper.join();
try timed.timedWaiter();
}
}
diff --git a/lib/std/Thread/StaticResetEvent.zig b/lib/std/Thread/StaticResetEvent.zig
index 6f869e0d89..40974938d0 100644
--- a/lib/std/Thread/StaticResetEvent.zig
+++ b/lib/std/Thread/StaticResetEvent.zig
@@ -384,8 +384,8 @@ test "basic usage" {
};
var context = Context{};
- const receiver = try std.Thread.spawn(Context.receiver, &context);
- defer receiver.wait();
+ const receiver = try std.Thread.spawn(.{}, Context.receiver, .{&context});
+ defer receiver.join();
try context.sender();
if (false) {
@@ -393,8 +393,8 @@ test "basic usage" {
// https://github.com/ziglang/zig/issues/7009
var timed = Context.init();
defer timed.deinit();
- const sleeper = try std.Thread.spawn(Context.sleeper, &timed);
- defer sleeper.wait();
+ const sleeper = try std.Thread.spawn(.{}, Context.sleeper, .{&timed});
+ defer sleeper.join();
try timed.timedWaiter();
}
}