From 0d1cd0d4822628c104890af4c31cdf38c6f96d35 Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Tue, 15 Dec 2020 18:30:29 -0700 Subject: use kprotty's ThreadPool implementation (v5) --- src/ThreadPool.zig | 116 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 116 insertions(+) create mode 100644 src/ThreadPool.zig (limited to 'src/ThreadPool.zig') diff --git a/src/ThreadPool.zig b/src/ThreadPool.zig new file mode 100644 index 0000000000..a5f59a30e7 --- /dev/null +++ b/src/ThreadPool.zig @@ -0,0 +1,116 @@ +const std = @import("std"); +const ThreadPool = @This(); + +lock: std.Mutex = .{}, +is_running: bool = true, +allocator: *std.mem.Allocator, +running: usize = 0, +threads: []*std.Thread, +run_queue: RunQueue = .{}, +idle_queue: IdleQueue = .{}, + +const IdleQueue = std.SinglyLinkedList(std.AutoResetEvent); +const RunQueue = std.SinglyLinkedList(Runnable); +const Runnable = struct { + runFn: fn (*Runnable) void, +}; + +pub fn init(self: *ThreadPool, allocator: *std.mem.Allocator) !void { + self.* = .{ + .allocator = allocator, + .threads = &[_]*std.Thread{}, + }; + + errdefer self.deinit(); + + var num_threads = std.Thread.cpuCount() catch 1; + if (num_threads > 0) + self.threads = try allocator.alloc(*std.Thread, num_threads); + + while (num_threads > 0) : (num_threads -= 1) { + const thread = try std.Thread.spawn(self, runWorker); + self.threads[self.running] = thread; + self.running += 1; + } +} + +pub fn deinit(self: *ThreadPool) void { + self.shutdown(); + + std.debug.assert(!self.is_running); + for (self.threads[0..self.running]) |thread| + thread.wait(); + + defer self.threads = &[_]*std.Thread{}; + if (self.running > 0) + self.allocator.free(self.threads); +} + +pub fn shutdown(self: *ThreadPool) void { + const held = self.lock.acquire(); + + if (!self.is_running) + return held.release(); + + var idle_queue = self.idle_queue; + self.idle_queue = .{}; + self.is_running = false; + held.release(); + + while (idle_queue.popFirst()) |idle_node| + idle_node.data.set(); +} + +pub fn spawn(self: *ThreadPool, comptime func: anytype, args: anytype) !void { + const Args = @TypeOf(args); + const Closure = struct { + arguments: Args, + pool: *ThreadPool, + run_node: RunQueue.Node = .{ .data = .{ .runFn = runFn } }, + + fn runFn(runnable: *Runnable) void { + const run_node = @fieldParentPtr(RunQueue.Node, "data", runnable); + const closure = @fieldParentPtr(@This(), "run_node", run_node); + const result = @call(.{}, func, closure.arguments); + closure.pool.allocator.destroy(closure); + } + }; + + const closure = try self.allocator.create(Closure); + errdefer self.allocator.destroy(closure); + closure.* = .{ + .arguments = args, + .pool = self, + }; + + const held = self.lock.acquire(); + self.run_queue.prepend(&closure.run_node); + + const idle_node = self.idle_queue.popFirst(); + held.release(); + + if (idle_node) |node| + node.data.set(); +} + +fn runWorker(self: *ThreadPool) void { + while (true) { + const held = self.lock.acquire(); + + if (self.run_queue.popFirst()) |run_node| { + held.release(); + (run_node.data.runFn)(&run_node.data); + continue; + } + + if (!self.is_running) { + held.release(); + return; + } + + var idle_node = IdleQueue.Node{ .data = .{} }; + self.idle_queue.prepend(&idle_node); + held.release(); + idle_node.data.wait(); + } +} -- cgit v1.2.3 From b2f8631a3c9b2cc04a4c78f38d164130be2fb1ae Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Fri, 18 Dec 2020 21:50:44 -0700 Subject: ThreadPool: delete dead code If this errdefer did get run it would constitute a race condition. So I deleted the dead code for clarity. --- lib/std/Progress.zig | 310 +++++++++++++++++++++++++++++++++++++++++++++++++++ lib/std/progress.zig | 310 --------------------------------------------------- src/ThreadPool.zig | 1 - 3 files changed, 310 insertions(+), 311 deletions(-) create mode 100644 lib/std/Progress.zig delete mode 100644 lib/std/progress.zig (limited to 'src/ThreadPool.zig') diff --git a/lib/std/Progress.zig b/lib/std/Progress.zig new file mode 100644 index 0000000000..82f2801fa1 --- /dev/null +++ b/lib/std/Progress.zig @@ -0,0 +1,310 @@ +// SPDX-License-Identifier: MIT +// Copyright (c) 2015-2020 Zig Contributors +// This file is part of [zig](https://ziglang.org/), which is MIT licensed. +// The MIT license requires this copyright notice to be included in all copies +// and substantial portions of the software. +const std = @import("std"); +const windows = std.os.windows; +const testing = std.testing; +const assert = std.debug.assert; + +/// This API is non-allocating and non-fallible. The tradeoff is that users of +/// this API must provide the storage for each `Progress.Node`. +/// Initialize the struct directly, overriding these fields as desired: +/// * `refresh_rate_ms` +/// * `initial_delay_ms` +pub const Progress = struct { + /// `null` if the current node (and its children) should + /// not print on update() + terminal: ?std.fs.File = undefined, + + /// Whether the terminal supports ANSI escape codes. + supports_ansi_escape_codes: bool = false, + + root: Node = undefined, + + /// Keeps track of how much time has passed since the beginning. + /// Used to compare with `initial_delay_ms` and `refresh_rate_ms`. + timer: std.time.Timer = undefined, + + /// When the previous refresh was written to the terminal. + /// Used to compare with `refresh_rate_ms`. + prev_refresh_timestamp: u64 = undefined, + + /// This buffer represents the maximum number of bytes written to the terminal + /// with each refresh. + output_buffer: [100]u8 = undefined, + + /// How many nanoseconds between writing updates to the terminal. + refresh_rate_ns: u64 = 50 * std.time.ns_per_ms, + + /// How many nanoseconds to keep the output hidden + initial_delay_ns: u64 = 500 * std.time.ns_per_ms, + + done: bool = true, + + /// Keeps track of how many columns in the terminal have been output, so that + /// we can move the cursor back later. + columns_written: usize = undefined, + + /// Represents one unit of progress. Each node can have children nodes, or + /// one can use integers with `update`. + pub const Node = struct { + context: *Progress, + parent: ?*Node, + completed_items: usize, + name: []const u8, + recently_updated_child: ?*Node = null, + + /// This field may be updated freely. + estimated_total_items: ?usize, + + /// Create a new child progress node. + /// Call `Node.end` when done. + /// TODO solve https://github.com/ziglang/zig/issues/2765 and then change this + /// API to set `self.parent.recently_updated_child` with the return value. + /// Until that is fixed you probably want to call `activate` on the return value. + pub fn start(self: *Node, name: []const u8, estimated_total_items: ?usize) Node { + return Node{ + .context = self.context, + .parent = self, + .completed_items = 0, + .name = name, + .estimated_total_items = estimated_total_items, + }; + } + + /// This is the same as calling `start` and then `end` on the returned `Node`. + pub fn completeOne(self: *Node) void { + if (self.parent) |parent| parent.recently_updated_child = self; + self.completed_items += 1; + self.context.maybeRefresh(); + } + + pub fn end(self: *Node) void { + self.context.maybeRefresh(); + if (self.parent) |parent| { + if (parent.recently_updated_child) |parent_child| { + if (parent_child == self) { + parent.recently_updated_child = null; + } + } + parent.completeOne(); + } else { + self.context.done = true; + self.context.refresh(); + } + } + + /// Tell the parent node that this node is actively being worked on. + pub fn activate(self: *Node) void { + if (self.parent) |parent| parent.recently_updated_child = self; + } + }; + + /// Create a new progress node. + /// Call `Node.end` when done. + /// TODO solve https://github.com/ziglang/zig/issues/2765 and then change this + /// API to return Progress rather than accept it as a parameter. + pub fn start(self: *Progress, name: []const u8, estimated_total_items: ?usize) !*Node { + const stderr = std.io.getStdErr(); + self.terminal = null; + if (stderr.supportsAnsiEscapeCodes()) { + self.terminal = stderr; + self.supports_ansi_escape_codes = true; + } else if (std.builtin.os.tag == .windows and stderr.isTty()) { + self.terminal = stderr; + } + self.root = Node{ + .context = self, + .parent = null, + .completed_items = 0, + .name = name, + .estimated_total_items = estimated_total_items, + }; + self.columns_written = 0; + self.prev_refresh_timestamp = 0; + self.timer = try std.time.Timer.start(); + self.done = false; + return &self.root; + } + + /// Updates the terminal if enough time has passed since last update. + pub fn maybeRefresh(self: *Progress) void { + const now = self.timer.read(); + if (now < self.initial_delay_ns) return; + if (now - self.prev_refresh_timestamp < self.refresh_rate_ns) return; + self.refresh(); + } + + /// Updates the terminal and resets `self.next_refresh_timestamp`. + pub fn refresh(self: *Progress) void { + const file = self.terminal orelse return; + + const prev_columns_written = self.columns_written; + var end: usize = 0; + if (self.columns_written > 0) { + // restore the cursor position by moving the cursor + // `columns_written` cells to the left, then clear the rest of the + // line + if (self.supports_ansi_escape_codes) { + end += (std.fmt.bufPrint(self.output_buffer[end..], "\x1b[{}D", .{self.columns_written}) catch unreachable).len; + end += (std.fmt.bufPrint(self.output_buffer[end..], "\x1b[0K", .{}) catch unreachable).len; + } else if (std.builtin.os.tag == .windows) winapi: { + var info: windows.CONSOLE_SCREEN_BUFFER_INFO = undefined; + if (windows.kernel32.GetConsoleScreenBufferInfo(file.handle, &info) != windows.TRUE) + unreachable; + + var cursor_pos = windows.COORD{ + .X = info.dwCursorPosition.X - @intCast(windows.SHORT, self.columns_written), + .Y = info.dwCursorPosition.Y, + }; + + if (cursor_pos.X < 0) + cursor_pos.X = 0; + + const fill_chars = @intCast(windows.DWORD, info.dwSize.X - cursor_pos.X); + + var written: windows.DWORD = undefined; + if (windows.kernel32.FillConsoleOutputAttribute( + file.handle, + info.wAttributes, + fill_chars, + cursor_pos, + &written, + ) != windows.TRUE) { + // Stop trying to write to this file. + self.terminal = null; + break :winapi; + } + if (windows.kernel32.FillConsoleOutputCharacterA( + file.handle, + ' ', + fill_chars, + cursor_pos, + &written, + ) != windows.TRUE) unreachable; + + if (windows.kernel32.SetConsoleCursorPosition(file.handle, cursor_pos) != windows.TRUE) + unreachable; + } else unreachable; + + self.columns_written = 0; + } + + if (!self.done) { + var need_ellipse = false; + var maybe_node: ?*Node = &self.root; + while (maybe_node) |node| { + if (need_ellipse) { + self.bufWrite(&end, "... ", .{}); + } + need_ellipse = false; + if (node.name.len != 0 or node.estimated_total_items != null) { + if (node.name.len != 0) { + self.bufWrite(&end, "{}", .{node.name}); + need_ellipse = true; + } + if (node.estimated_total_items) |total| { + if (need_ellipse) self.bufWrite(&end, " ", .{}); + self.bufWrite(&end, "[{}/{}] ", .{ node.completed_items + 1, total }); + need_ellipse = false; + } else if (node.completed_items != 0) { + if (need_ellipse) self.bufWrite(&end, " ", .{}); + self.bufWrite(&end, "[{}] ", .{node.completed_items + 1}); + need_ellipse = false; + } + } + maybe_node = node.recently_updated_child; + } + if (need_ellipse) { + self.bufWrite(&end, "... ", .{}); + } + } + + _ = file.write(self.output_buffer[0..end]) catch |e| { + // Stop trying to write to this file once it errors. + self.terminal = null; + }; + self.prev_refresh_timestamp = self.timer.read(); + } + + pub fn log(self: *Progress, comptime format: []const u8, args: anytype) void { + const file = self.terminal orelse return; + self.refresh(); + file.outStream().print(format, args) catch { + self.terminal = null; + return; + }; + self.columns_written = 0; + } + + fn bufWrite(self: *Progress, end: *usize, comptime format: []const u8, args: anytype) void { + if (std.fmt.bufPrint(self.output_buffer[end.*..], format, args)) |written| { + const amt = written.len; + end.* += amt; + self.columns_written += amt; + } else |err| switch (err) { + error.NoSpaceLeft => { + self.columns_written += self.output_buffer.len - end.*; + end.* = self.output_buffer.len; + }, + } + const bytes_needed_for_esc_codes_at_end = if (std.builtin.os.tag == .windows) 0 else 11; + const max_end = self.output_buffer.len - bytes_needed_for_esc_codes_at_end; + if (end.* > max_end) { + const suffix = "... "; + self.columns_written = self.columns_written - (end.* - max_end) + suffix.len; + std.mem.copy(u8, self.output_buffer[max_end..], suffix); + end.* = max_end + suffix.len; + } + } +}; + +test "basic functionality" { + var disable = true; + if (disable) { + // This test is disabled because it uses time.sleep() and is therefore slow. It also + // prints bogus progress data to stderr. + return error.SkipZigTest; + } + var progress = Progress{}; + const root_node = try progress.start("", 100); + defer root_node.end(); + + const sub_task_names = [_][]const u8{ + "reticulating splines", + "adjusting shoes", + "climbing towers", + "pouring juice", + }; + var next_sub_task: usize = 0; + + var i: usize = 0; + while (i < 100) : (i += 1) { + var node = root_node.start(sub_task_names[next_sub_task], 5); + node.activate(); + next_sub_task = (next_sub_task + 1) % sub_task_names.len; + + node.completeOne(); + std.time.sleep(5 * std.time.ns_per_ms); + node.completeOne(); + node.completeOne(); + std.time.sleep(5 * std.time.ns_per_ms); + node.completeOne(); + node.completeOne(); + std.time.sleep(5 * std.time.ns_per_ms); + + node.end(); + + std.time.sleep(5 * std.time.ns_per_ms); + } + { + var node = root_node.start("this is a really long name designed to activate the truncation code. let's find out if it works", null); + node.activate(); + std.time.sleep(10 * std.time.ns_per_ms); + progress.refresh(); + std.time.sleep(10 * std.time.ns_per_ms); + node.end(); + } +} diff --git a/lib/std/progress.zig b/lib/std/progress.zig deleted file mode 100644 index 82f2801fa1..0000000000 --- a/lib/std/progress.zig +++ /dev/null @@ -1,310 +0,0 @@ -// SPDX-License-Identifier: MIT -// Copyright (c) 2015-2020 Zig Contributors -// This file is part of [zig](https://ziglang.org/), which is MIT licensed. -// The MIT license requires this copyright notice to be included in all copies -// and substantial portions of the software. -const std = @import("std"); -const windows = std.os.windows; -const testing = std.testing; -const assert = std.debug.assert; - -/// This API is non-allocating and non-fallible. The tradeoff is that users of -/// this API must provide the storage for each `Progress.Node`. -/// Initialize the struct directly, overriding these fields as desired: -/// * `refresh_rate_ms` -/// * `initial_delay_ms` -pub const Progress = struct { - /// `null` if the current node (and its children) should - /// not print on update() - terminal: ?std.fs.File = undefined, - - /// Whether the terminal supports ANSI escape codes. - supports_ansi_escape_codes: bool = false, - - root: Node = undefined, - - /// Keeps track of how much time has passed since the beginning. - /// Used to compare with `initial_delay_ms` and `refresh_rate_ms`. - timer: std.time.Timer = undefined, - - /// When the previous refresh was written to the terminal. - /// Used to compare with `refresh_rate_ms`. - prev_refresh_timestamp: u64 = undefined, - - /// This buffer represents the maximum number of bytes written to the terminal - /// with each refresh. - output_buffer: [100]u8 = undefined, - - /// How many nanoseconds between writing updates to the terminal. - refresh_rate_ns: u64 = 50 * std.time.ns_per_ms, - - /// How many nanoseconds to keep the output hidden - initial_delay_ns: u64 = 500 * std.time.ns_per_ms, - - done: bool = true, - - /// Keeps track of how many columns in the terminal have been output, so that - /// we can move the cursor back later. - columns_written: usize = undefined, - - /// Represents one unit of progress. Each node can have children nodes, or - /// one can use integers with `update`. - pub const Node = struct { - context: *Progress, - parent: ?*Node, - completed_items: usize, - name: []const u8, - recently_updated_child: ?*Node = null, - - /// This field may be updated freely. - estimated_total_items: ?usize, - - /// Create a new child progress node. - /// Call `Node.end` when done. - /// TODO solve https://github.com/ziglang/zig/issues/2765 and then change this - /// API to set `self.parent.recently_updated_child` with the return value. - /// Until that is fixed you probably want to call `activate` on the return value. - pub fn start(self: *Node, name: []const u8, estimated_total_items: ?usize) Node { - return Node{ - .context = self.context, - .parent = self, - .completed_items = 0, - .name = name, - .estimated_total_items = estimated_total_items, - }; - } - - /// This is the same as calling `start` and then `end` on the returned `Node`. - pub fn completeOne(self: *Node) void { - if (self.parent) |parent| parent.recently_updated_child = self; - self.completed_items += 1; - self.context.maybeRefresh(); - } - - pub fn end(self: *Node) void { - self.context.maybeRefresh(); - if (self.parent) |parent| { - if (parent.recently_updated_child) |parent_child| { - if (parent_child == self) { - parent.recently_updated_child = null; - } - } - parent.completeOne(); - } else { - self.context.done = true; - self.context.refresh(); - } - } - - /// Tell the parent node that this node is actively being worked on. - pub fn activate(self: *Node) void { - if (self.parent) |parent| parent.recently_updated_child = self; - } - }; - - /// Create a new progress node. - /// Call `Node.end` when done. - /// TODO solve https://github.com/ziglang/zig/issues/2765 and then change this - /// API to return Progress rather than accept it as a parameter. - pub fn start(self: *Progress, name: []const u8, estimated_total_items: ?usize) !*Node { - const stderr = std.io.getStdErr(); - self.terminal = null; - if (stderr.supportsAnsiEscapeCodes()) { - self.terminal = stderr; - self.supports_ansi_escape_codes = true; - } else if (std.builtin.os.tag == .windows and stderr.isTty()) { - self.terminal = stderr; - } - self.root = Node{ - .context = self, - .parent = null, - .completed_items = 0, - .name = name, - .estimated_total_items = estimated_total_items, - }; - self.columns_written = 0; - self.prev_refresh_timestamp = 0; - self.timer = try std.time.Timer.start(); - self.done = false; - return &self.root; - } - - /// Updates the terminal if enough time has passed since last update. - pub fn maybeRefresh(self: *Progress) void { - const now = self.timer.read(); - if (now < self.initial_delay_ns) return; - if (now - self.prev_refresh_timestamp < self.refresh_rate_ns) return; - self.refresh(); - } - - /// Updates the terminal and resets `self.next_refresh_timestamp`. - pub fn refresh(self: *Progress) void { - const file = self.terminal orelse return; - - const prev_columns_written = self.columns_written; - var end: usize = 0; - if (self.columns_written > 0) { - // restore the cursor position by moving the cursor - // `columns_written` cells to the left, then clear the rest of the - // line - if (self.supports_ansi_escape_codes) { - end += (std.fmt.bufPrint(self.output_buffer[end..], "\x1b[{}D", .{self.columns_written}) catch unreachable).len; - end += (std.fmt.bufPrint(self.output_buffer[end..], "\x1b[0K", .{}) catch unreachable).len; - } else if (std.builtin.os.tag == .windows) winapi: { - var info: windows.CONSOLE_SCREEN_BUFFER_INFO = undefined; - if (windows.kernel32.GetConsoleScreenBufferInfo(file.handle, &info) != windows.TRUE) - unreachable; - - var cursor_pos = windows.COORD{ - .X = info.dwCursorPosition.X - @intCast(windows.SHORT, self.columns_written), - .Y = info.dwCursorPosition.Y, - }; - - if (cursor_pos.X < 0) - cursor_pos.X = 0; - - const fill_chars = @intCast(windows.DWORD, info.dwSize.X - cursor_pos.X); - - var written: windows.DWORD = undefined; - if (windows.kernel32.FillConsoleOutputAttribute( - file.handle, - info.wAttributes, - fill_chars, - cursor_pos, - &written, - ) != windows.TRUE) { - // Stop trying to write to this file. - self.terminal = null; - break :winapi; - } - if (windows.kernel32.FillConsoleOutputCharacterA( - file.handle, - ' ', - fill_chars, - cursor_pos, - &written, - ) != windows.TRUE) unreachable; - - if (windows.kernel32.SetConsoleCursorPosition(file.handle, cursor_pos) != windows.TRUE) - unreachable; - } else unreachable; - - self.columns_written = 0; - } - - if (!self.done) { - var need_ellipse = false; - var maybe_node: ?*Node = &self.root; - while (maybe_node) |node| { - if (need_ellipse) { - self.bufWrite(&end, "... ", .{}); - } - need_ellipse = false; - if (node.name.len != 0 or node.estimated_total_items != null) { - if (node.name.len != 0) { - self.bufWrite(&end, "{}", .{node.name}); - need_ellipse = true; - } - if (node.estimated_total_items) |total| { - if (need_ellipse) self.bufWrite(&end, " ", .{}); - self.bufWrite(&end, "[{}/{}] ", .{ node.completed_items + 1, total }); - need_ellipse = false; - } else if (node.completed_items != 0) { - if (need_ellipse) self.bufWrite(&end, " ", .{}); - self.bufWrite(&end, "[{}] ", .{node.completed_items + 1}); - need_ellipse = false; - } - } - maybe_node = node.recently_updated_child; - } - if (need_ellipse) { - self.bufWrite(&end, "... ", .{}); - } - } - - _ = file.write(self.output_buffer[0..end]) catch |e| { - // Stop trying to write to this file once it errors. - self.terminal = null; - }; - self.prev_refresh_timestamp = self.timer.read(); - } - - pub fn log(self: *Progress, comptime format: []const u8, args: anytype) void { - const file = self.terminal orelse return; - self.refresh(); - file.outStream().print(format, args) catch { - self.terminal = null; - return; - }; - self.columns_written = 0; - } - - fn bufWrite(self: *Progress, end: *usize, comptime format: []const u8, args: anytype) void { - if (std.fmt.bufPrint(self.output_buffer[end.*..], format, args)) |written| { - const amt = written.len; - end.* += amt; - self.columns_written += amt; - } else |err| switch (err) { - error.NoSpaceLeft => { - self.columns_written += self.output_buffer.len - end.*; - end.* = self.output_buffer.len; - }, - } - const bytes_needed_for_esc_codes_at_end = if (std.builtin.os.tag == .windows) 0 else 11; - const max_end = self.output_buffer.len - bytes_needed_for_esc_codes_at_end; - if (end.* > max_end) { - const suffix = "... "; - self.columns_written = self.columns_written - (end.* - max_end) + suffix.len; - std.mem.copy(u8, self.output_buffer[max_end..], suffix); - end.* = max_end + suffix.len; - } - } -}; - -test "basic functionality" { - var disable = true; - if (disable) { - // This test is disabled because it uses time.sleep() and is therefore slow. It also - // prints bogus progress data to stderr. - return error.SkipZigTest; - } - var progress = Progress{}; - const root_node = try progress.start("", 100); - defer root_node.end(); - - const sub_task_names = [_][]const u8{ - "reticulating splines", - "adjusting shoes", - "climbing towers", - "pouring juice", - }; - var next_sub_task: usize = 0; - - var i: usize = 0; - while (i < 100) : (i += 1) { - var node = root_node.start(sub_task_names[next_sub_task], 5); - node.activate(); - next_sub_task = (next_sub_task + 1) % sub_task_names.len; - - node.completeOne(); - std.time.sleep(5 * std.time.ns_per_ms); - node.completeOne(); - node.completeOne(); - std.time.sleep(5 * std.time.ns_per_ms); - node.completeOne(); - node.completeOne(); - std.time.sleep(5 * std.time.ns_per_ms); - - node.end(); - - std.time.sleep(5 * std.time.ns_per_ms); - } - { - var node = root_node.start("this is a really long name designed to activate the truncation code. let's find out if it works", null); - node.activate(); - std.time.sleep(10 * std.time.ns_per_ms); - progress.refresh(); - std.time.sleep(10 * std.time.ns_per_ms); - node.end(); - } -} diff --git a/src/ThreadPool.zig b/src/ThreadPool.zig index a5f59a30e7..6a59b684be 100644 --- a/src/ThreadPool.zig +++ b/src/ThreadPool.zig @@ -77,7 +77,6 @@ pub fn spawn(self: *ThreadPool, comptime func: anytype, args: anytype) !void { }; const closure = try self.allocator.create(Closure); - errdefer self.allocator.destroy(closure); closure.* = .{ .arguments = args, .pool = self, -- cgit v1.2.3 From 4e621d4260ed752995dedc50a240931fc0e0941f Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Sat, 19 Dec 2020 15:03:03 -0700 Subject: workaround for std lib AutoResetEvent bug --- lib/std/auto_reset_event.zig | 44 ++++++++++++++++++++++---------------------- src/Event.zig | 43 +++++++++++++++++++++++++++++++++++++++++++ src/ThreadPool.zig | 5 +++++ src/WaitGroup.zig | 24 ++++++++++++++++-------- 4 files changed, 86 insertions(+), 30 deletions(-) create mode 100644 src/Event.zig (limited to 'src/ThreadPool.zig') diff --git a/lib/std/auto_reset_event.zig b/lib/std/auto_reset_event.zig index 7e13dc1aba..3c7e65e362 100644 --- a/lib/std/auto_reset_event.zig +++ b/lib/std/auto_reset_event.zig @@ -11,33 +11,33 @@ const assert = std.debug.assert; /// Similar to std.ResetEvent but on `set()` it also (atomically) does `reset()`. /// Unlike std.ResetEvent, `wait()` can only be called by one thread (MPSC-like). pub const AutoResetEvent = struct { - // AutoResetEvent has 3 possible states: - // - UNSET: the AutoResetEvent is currently unset - // - SET: the AutoResetEvent was notified before a wait() was called - // - : there is an active waiter waiting for a notification. - // - // When attempting to wait: - // if the event is unset, it registers a ResetEvent pointer to be notified when the event is set - // if the event is already set, then it consumes the notification and resets the event. - // - // When attempting to notify: - // if the event is unset, then we set the event - // if theres a waiting ResetEvent, then we unset the event and notify the ResetEvent - // - // This ensures that the event is automatically reset after a wait() has been issued - // and avoids the race condition when using std.ResetEvent in the following scenario: - // thread 1 | thread 2 - // std.ResetEvent.wait() | - // | std.ResetEvent.set() - // | std.ResetEvent.set() - // std.ResetEvent.reset() | - // std.ResetEvent.wait() | (missed the second .set() notification above) + /// AutoResetEvent has 3 possible states: + /// - UNSET: the AutoResetEvent is currently unset + /// - SET: the AutoResetEvent was notified before a wait() was called + /// - : there is an active waiter waiting for a notification. + /// + /// When attempting to wait: + /// if the event is unset, it registers a ResetEvent pointer to be notified when the event is set + /// if the event is already set, then it consumes the notification and resets the event. + /// + /// When attempting to notify: + /// if the event is unset, then we set the event + /// if theres a waiting ResetEvent, then we unset the event and notify the ResetEvent + /// + /// This ensures that the event is automatically reset after a wait() has been issued + /// and avoids the race condition when using std.ResetEvent in the following scenario: + /// thread 1 | thread 2 + /// std.ResetEvent.wait() | + /// | std.ResetEvent.set() + /// | std.ResetEvent.set() + /// std.ResetEvent.reset() | + /// std.ResetEvent.wait() | (missed the second .set() notification above) state: usize = UNSET, const UNSET = 0; const SET = 1; - // the minimum alignment for the `*std.ResetEvent` created by wait*() + /// the minimum alignment for the `*std.ResetEvent` created by wait*() const event_align = std.math.max(@alignOf(std.ResetEvent), 2); pub fn wait(self: *AutoResetEvent) void { diff --git a/src/Event.zig b/src/Event.zig new file mode 100644 index 0000000000..2b8d7be998 --- /dev/null +++ b/src/Event.zig @@ -0,0 +1,43 @@ +// SPDX-License-Identifier: MIT +// Copyright (c) 2015-2020 Zig Contributors +// This file is part of [zig](https://ziglang.org/), which is MIT licensed. +// The MIT license requires this copyright notice to be included in all copies +// and substantial portions of the software. +const std = @import("std"); +const Event = @This(); + +lock: std.Mutex = .{}, +event: std.ResetEvent = undefined, +state: enum { empty, waiting, notified } = .empty, + +pub fn wait(self: *Event) void { + const held = self.lock.acquire(); + + switch (self.state) { + .empty => { + self.state = .waiting; + self.event = @TypeOf(self.event).init(); + held.release(); + self.event.wait(); + self.event.deinit(); + }, + .waiting => unreachable, + .notified => held.release(), + } +} + +pub fn set(self: *Event) void { + const held = self.lock.acquire(); + + switch (self.state) { + .empty => { + self.state = .notified; + held.release(); + }, + .waiting => { + held.release(); + self.event.set(); + }, + .notified => unreachable, + } +} diff --git a/src/ThreadPool.zig b/src/ThreadPool.zig index 6a59b684be..7d6af3d24c 100644 --- a/src/ThreadPool.zig +++ b/src/ThreadPool.zig @@ -1,3 +1,8 @@ +// SPDX-License-Identifier: MIT +// Copyright (c) 2015-2020 Zig Contributors +// This file is part of [zig](https://ziglang.org/), which is MIT licensed. +// The MIT license requires this copyright notice to be included in all copies +// and substantial portions of the software. const std = @import("std"); const ThreadPool = @This(); diff --git a/src/WaitGroup.zig b/src/WaitGroup.zig index c33d084c28..f17ab580d3 100644 --- a/src/WaitGroup.zig +++ b/src/WaitGroup.zig @@ -1,9 +1,15 @@ +// SPDX-License-Identifier: MIT +// Copyright (c) 2015-2020 Zig Contributors +// This file is part of [zig](https://ziglang.org/), which is MIT licensed. +// The MIT license requires this copyright notice to be included in all copies +// and substantial portions of the software. const std = @import("std"); const WaitGroup = @This(); +const Event = @import("Event.zig"); lock: std.Mutex = .{}, counter: usize = 0, -event: std.AutoResetEvent = .{}, +event: Event = .{}, pub fn start(self: *WaitGroup) void { const held = self.lock.acquire(); @@ -22,13 +28,15 @@ pub fn stop(self: *WaitGroup) void { } pub fn wait(self: *WaitGroup) void { - { - const held = self.lock.acquire(); - defer held.release(); + while (true) { + { + const held = self.lock.acquire(); + defer held.release(); - if (self.counter == 0) - return; - } + if (self.counter == 0) + return; + } - self.event.wait(); + self.event.wait(); + } } -- cgit v1.2.3 From 1d94a6893689ad1fb8e06308ae51603a6c8708a8 Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Sun, 20 Dec 2020 15:37:58 -0700 Subject: add an option to compile zig in single-threaded mode And enable it for Drone CI. I hate to do this, but I need to make progress on other fronts. --- CMakeLists.txt | 10 ++++++++++ ci/drone/linux_script | 3 ++- src/Compilation.zig | 12 +++++++----- src/ThreadPool.zig | 6 ++++++ src/stage1/zig0.cpp | 4 ++++ 5 files changed, 29 insertions(+), 6 deletions(-) (limited to 'src/ThreadPool.zig') diff --git a/CMakeLists.txt b/CMakeLists.txt index 2580c06066..e3c4e67c5a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -75,6 +75,7 @@ set(ZIG_TARGET_TRIPLE "native" CACHE STRING "arch-os-abi to output binaries for" set(ZIG_TARGET_MCPU "baseline" CACHE STRING "-mcpu parameter to output binaries for") set(ZIG_EXECUTABLE "" CACHE STRING "(when cross compiling) path to already-built zig binary") set(ZIG_PREFER_LLVM_CONFIG off CACHE BOOL "(when cross compiling) use llvm-config to find target llvm dependencies if needed") +set(ZIG_SINGLE_THREADED off CACHE BOOL "limit the zig compiler to use only 1 thread") find_package(llvm) find_package(clang) @@ -510,10 +511,13 @@ set(ZIG_STAGE2_SOURCES "${CMAKE_SOURCE_DIR}/src/Cache.zig" "${CMAKE_SOURCE_DIR}/src/Compilation.zig" "${CMAKE_SOURCE_DIR}/src/DepTokenizer.zig" + "${CMAKE_SOURCE_DIR}/src/Event.zig" "${CMAKE_SOURCE_DIR}/src/Module.zig" "${CMAKE_SOURCE_DIR}/src/Package.zig" "${CMAKE_SOURCE_DIR}/src/RangeSet.zig" + "${CMAKE_SOURCE_DIR}/src/ThreadPool.zig" "${CMAKE_SOURCE_DIR}/src/TypedValue.zig" + "${CMAKE_SOURCE_DIR}/src/WaitGroup.zig" "${CMAKE_SOURCE_DIR}/src/astgen.zig" "${CMAKE_SOURCE_DIR}/src/clang.zig" "${CMAKE_SOURCE_DIR}/src/clang_options.zig" @@ -713,6 +717,11 @@ if("${CMAKE_BUILD_TYPE}" STREQUAL "Debug") else() set(ZIG1_RELEASE_ARG -OReleaseFast --strip) endif() +if(ZIG_SINGLE_THREADED) + set(ZIG1_SINGLE_THREADED_ARG "--single-threaded") +else() + set(ZIG1_SINGLE_THREADED_ARG "") +endif() set(BUILD_ZIG1_ARGS "src/stage1.zig" @@ -722,6 +731,7 @@ set(BUILD_ZIG1_ARGS --override-lib-dir "${CMAKE_SOURCE_DIR}/lib" "-femit-bin=${ZIG1_OBJECT}" "${ZIG1_RELEASE_ARG}" + "${ZIG1_SINGLE_THREADED_ARG}" -lc --pkg-begin build_options "${ZIG_CONFIG_ZIG_OUT}" --pkg-end diff --git a/ci/drone/linux_script b/ci/drone/linux_script index fdc1704fb7..8c5dc1be2a 100755 --- a/ci/drone/linux_script +++ b/ci/drone/linux_script @@ -17,7 +17,8 @@ git config core.abbrev 9 mkdir build cd build -cmake .. -DCMAKE_BUILD_TYPE=Release "-DCMAKE_INSTALL_PREFIX=$DISTDIR" -DZIG_STATIC=ON -DCMAKE_PREFIX_PATH=/deps/local -GNinja +# TODO figure out why Drone CI is deadlocking and stop passing -DZIG_SINGLE_THREADED=ON +cmake .. -DCMAKE_BUILD_TYPE=Release "-DCMAKE_INSTALL_PREFIX=$DISTDIR" -DZIG_STATIC=ON -DCMAKE_PREFIX_PATH=/deps/local -GNinja -DZIG_SINGLE_THREADED=ON samu install ./zig build test -Dskip-release -Dskip-non-native diff --git a/src/Compilation.zig b/src/Compilation.zig index 1c27a589ee..23f67f5b37 100644 --- a/src/Compilation.zig +++ b/src/Compilation.zig @@ -1728,7 +1728,7 @@ fn workerUpdateCObject( error.AnalysisFail => return, else => { { - var lock = comp.mutex.acquire(); + const lock = comp.mutex.acquire(); defer lock.release(); comp.failed_c_objects.ensureCapacity(comp.gpa, comp.failed_c_objects.items().len + 1) catch { fatal("TODO handle this by setting c_object.status = oom failure", .{}); @@ -1759,7 +1759,7 @@ fn updateCObject(comp: *Compilation, c_object: *CObject, c_comp_progress_node: * if (c_object.clearStatus(comp.gpa)) { // There was previous failure. - var lock = comp.mutex.acquire(); + const lock = comp.mutex.acquire(); defer lock.release(); comp.failed_c_objects.removeAssertDiscard(c_object); } @@ -1789,10 +1789,12 @@ fn updateCObject(comp: *Compilation, c_object: *CObject, c_comp_progress_node: * { const is_collision = blk: { - var lock = comp.mutex.acquire(); + const bin_digest = man.hash.peekBin(); + + const lock = comp.mutex.acquire(); defer lock.release(); - const gop = try comp.c_object_cache_digest_set.getOrPut(comp.gpa, man.hash.peekBin()); + const gop = try comp.c_object_cache_digest_set.getOrPut(comp.gpa, bin_digest); break :blk gop.found_existing; }; if (is_collision) { @@ -2211,7 +2213,7 @@ fn failCObj(comp: *Compilation, c_object: *CObject, comptime format: []const u8, fn failCObjWithOwnedErrorMsg(comp: *Compilation, c_object: *CObject, err_msg: *ErrorMsg) InnerError { { - var lock = comp.mutex.acquire(); + const lock = comp.mutex.acquire(); defer lock.release(); { errdefer err_msg.destroy(comp.gpa); diff --git a/src/ThreadPool.zig b/src/ThreadPool.zig index 7d6af3d24c..00cb26772a 100644 --- a/src/ThreadPool.zig +++ b/src/ThreadPool.zig @@ -25,6 +25,8 @@ pub fn init(self: *ThreadPool, allocator: *std.mem.Allocator) !void { .allocator = allocator, .threads = &[_]*std.Thread{}, }; + if (std.builtin.single_threaded) + return; errdefer self.deinit(); @@ -67,6 +69,10 @@ pub fn shutdown(self: *ThreadPool) void { } pub fn spawn(self: *ThreadPool, comptime func: anytype, args: anytype) !void { + if (std.builtin.single_threaded) { + @call(.{}, func, args); + return; + } const Args = @TypeOf(args); const Closure = struct { arguments: Args, diff --git a/src/stage1/zig0.cpp b/src/stage1/zig0.cpp index 73f5b4f685..bcc9dbc00a 100644 --- a/src/stage1/zig0.cpp +++ b/src/stage1/zig0.cpp @@ -266,6 +266,7 @@ int main(int argc, char **argv) { TargetSubsystem subsystem = TargetSubsystemAuto; const char *override_lib_dir = nullptr; const char *mcpu = nullptr; + bool single_threaded = false; for (int i = 1; i < argc; i += 1) { char *arg = argv[i]; @@ -281,6 +282,8 @@ int main(int argc, char **argv) { optimize_mode = BuildModeSafeRelease; } else if (strcmp(arg, "-OReleaseSmall") == 0) { optimize_mode = BuildModeSmallRelease; + } else if (strcmp(arg, "--single-threaded") == 0) { + single_threaded = true; } else if (strcmp(arg, "--help") == 0) { return print_full_usage(arg0, stdout, EXIT_SUCCESS); } else if (strcmp(arg, "--strip") == 0) { @@ -469,6 +472,7 @@ int main(int argc, char **argv) { stage1->link_libcpp = link_libcpp; stage1->subsystem = subsystem; stage1->pic = true; + stage1->is_single_threaded = single_threaded; zig_stage1_build_object(stage1); -- cgit v1.2.3 From 829c00a77fd2d6b7576c6d2b724f69ba9cfe10f2 Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Mon, 21 Dec 2020 14:21:51 -0700 Subject: kprotty ThreadPool and WaitGroup patch --- CMakeLists.txt | 1 - ci/drone/linux_script | 3 +- src/Event.zig | 43 ----------------------------- src/ThreadPool.zig | 76 ++++++++++++++++++++++++--------------------------- src/WaitGroup.zig | 38 ++++++++++++++------------ 5 files changed, 57 insertions(+), 104 deletions(-) delete mode 100644 src/Event.zig (limited to 'src/ThreadPool.zig') diff --git a/CMakeLists.txt b/CMakeLists.txt index 8b4c067ae6..272cdc6921 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -512,7 +512,6 @@ set(ZIG_STAGE2_SOURCES "${CMAKE_SOURCE_DIR}/src/Cache.zig" "${CMAKE_SOURCE_DIR}/src/Compilation.zig" "${CMAKE_SOURCE_DIR}/src/DepTokenizer.zig" - "${CMAKE_SOURCE_DIR}/src/Event.zig" "${CMAKE_SOURCE_DIR}/src/Module.zig" "${CMAKE_SOURCE_DIR}/src/Package.zig" "${CMAKE_SOURCE_DIR}/src/RangeSet.zig" diff --git a/ci/drone/linux_script b/ci/drone/linux_script index 8c5dc1be2a..fdc1704fb7 100755 --- a/ci/drone/linux_script +++ b/ci/drone/linux_script @@ -17,8 +17,7 @@ git config core.abbrev 9 mkdir build cd build -# TODO figure out why Drone CI is deadlocking and stop passing -DZIG_SINGLE_THREADED=ON -cmake .. -DCMAKE_BUILD_TYPE=Release "-DCMAKE_INSTALL_PREFIX=$DISTDIR" -DZIG_STATIC=ON -DCMAKE_PREFIX_PATH=/deps/local -GNinja -DZIG_SINGLE_THREADED=ON +cmake .. -DCMAKE_BUILD_TYPE=Release "-DCMAKE_INSTALL_PREFIX=$DISTDIR" -DZIG_STATIC=ON -DCMAKE_PREFIX_PATH=/deps/local -GNinja samu install ./zig build test -Dskip-release -Dskip-non-native diff --git a/src/Event.zig b/src/Event.zig deleted file mode 100644 index 2b8d7be998..0000000000 --- a/src/Event.zig +++ /dev/null @@ -1,43 +0,0 @@ -// SPDX-License-Identifier: MIT -// Copyright (c) 2015-2020 Zig Contributors -// This file is part of [zig](https://ziglang.org/), which is MIT licensed. -// The MIT license requires this copyright notice to be included in all copies -// and substantial portions of the software. -const std = @import("std"); -const Event = @This(); - -lock: std.Mutex = .{}, -event: std.ResetEvent = undefined, -state: enum { empty, waiting, notified } = .empty, - -pub fn wait(self: *Event) void { - const held = self.lock.acquire(); - - switch (self.state) { - .empty => { - self.state = .waiting; - self.event = @TypeOf(self.event).init(); - held.release(); - self.event.wait(); - self.event.deinit(); - }, - .waiting => unreachable, - .notified => held.release(), - } -} - -pub fn set(self: *Event) void { - const held = self.lock.acquire(); - - switch (self.state) { - .empty => { - self.state = .notified; - held.release(); - }, - .waiting => { - held.release(); - self.event.set(); - }, - .notified => unreachable, - } -} diff --git a/src/ThreadPool.zig b/src/ThreadPool.zig index 00cb26772a..71c72fb8da 100644 --- a/src/ThreadPool.zig +++ b/src/ThreadPool.zig @@ -9,12 +9,12 @@ const ThreadPool = @This(); lock: std.Mutex = .{}, is_running: bool = true, allocator: *std.mem.Allocator, -running: usize = 0, +spawned: usize = 0, threads: []*std.Thread, run_queue: RunQueue = .{}, idle_queue: IdleQueue = .{}, -const IdleQueue = std.SinglyLinkedList(std.AutoResetEvent); +const IdleQueue = std.SinglyLinkedList(std.ResetEvent); const RunQueue = std.SinglyLinkedList(Runnable); const Runnable = struct { runFn: fn (*Runnable) void, @@ -30,49 +30,37 @@ pub fn init(self: *ThreadPool, allocator: *std.mem.Allocator) !void { errdefer self.deinit(); - var num_threads = std.Thread.cpuCount() catch 1; - if (num_threads > 0) - self.threads = try allocator.alloc(*std.Thread, num_threads); + var num_threads = std.math.max(1, std.Thread.cpuCount() catch 1); + self.threads = try allocator.alloc(*std.Thread, num_threads); while (num_threads > 0) : (num_threads -= 1) { const thread = try std.Thread.spawn(self, runWorker); - self.threads[self.running] = thread; - self.running += 1; + self.threads[self.spawned] = thread; + self.spawned += 1; } } pub fn deinit(self: *ThreadPool) void { - self.shutdown(); - - std.debug.assert(!self.is_running); - for (self.threads[0..self.running]) |thread| - thread.wait(); - - defer self.threads = &[_]*std.Thread{}; - if (self.running > 0) - self.allocator.free(self.threads); -} - -pub fn shutdown(self: *ThreadPool) void { - const held = self.lock.acquire(); - - if (!self.is_running) - return held.release(); + { + const held = self.lock.acquire(); + defer held.release(); - var idle_queue = self.idle_queue; - self.idle_queue = .{}; - self.is_running = false; - held.release(); + self.is_running = false; + while (self.idle_queue.popFirst()) |idle_node| + idle_node.data.set(); + } - while (idle_queue.popFirst()) |idle_node| - idle_node.data.set(); + defer self.allocator.free(self.threads); + for (self.threads[0..self.spawned]) |thread| + thread.wait(); } pub fn spawn(self: *ThreadPool, comptime func: anytype, args: anytype) !void { if (std.builtin.single_threaded) { - @call(.{}, func, args); + const result = @call(.{}, func, args); return; } + const Args = @TypeOf(args); const Closure = struct { arguments: Args, @@ -83,24 +71,26 @@ pub fn spawn(self: *ThreadPool, comptime func: anytype, args: anytype) !void { const run_node = @fieldParentPtr(RunQueue.Node, "data", runnable); const closure = @fieldParentPtr(@This(), "run_node", run_node); const result = @call(.{}, func, closure.arguments); + + const held = closure.pool.lock.acquire(); + defer held.release(); closure.pool.allocator.destroy(closure); } }; + const held = self.lock.acquire(); + defer held.release(); + const closure = try self.allocator.create(Closure); closure.* = .{ .arguments = args, .pool = self, }; - const held = self.lock.acquire(); self.run_queue.prepend(&closure.run_node); - const idle_node = self.idle_queue.popFirst(); - held.release(); - - if (idle_node) |node| - node.data.set(); + if (self.idle_queue.popFirst()) |idle_node| + idle_node.data.set(); } fn runWorker(self: *ThreadPool) void { @@ -113,14 +103,18 @@ fn runWorker(self: *ThreadPool) void { continue; } - if (!self.is_running) { + if (self.is_running) { + var idle_node = IdleQueue.Node{ .data = std.ResetEvent.init() }; + defer idle_node.data.deinit(); + + self.idle_queue.prepend(&idle_node); held.release(); - return; + + idle_node.data.wait(); + continue; } - var idle_node = IdleQueue.Node{ .data = .{} }; - self.idle_queue.prepend(&idle_node); held.release(); - idle_node.data.wait(); + return; } } diff --git a/src/WaitGroup.zig b/src/WaitGroup.zig index 2c1b49224b..e5d4e600e2 100644 --- a/src/WaitGroup.zig +++ b/src/WaitGroup.zig @@ -5,11 +5,10 @@ // and substantial portions of the software. const std = @import("std"); const WaitGroup = @This(); -const Event = @import("Event.zig"); lock: std.Mutex = .{}, counter: usize = 0, -event: ?*Event = null, +event: ?*std.ResetEvent = null, pub fn start(self: *WaitGroup) void { const held = self.lock.acquire(); @@ -19,28 +18,33 @@ pub fn start(self: *WaitGroup) void { } pub fn stop(self: *WaitGroup) void { - var event: ?*Event = null; - defer if (event) |waiter| - waiter.set(); - const held = self.lock.acquire(); defer held.release(); self.counter -= 1; - if (self.counter == 0) - std.mem.swap(?*Event, &self.event, &event); + + if (self.counter == 0) { + if (self.event) |event| { + self.event = null; + event.set(); + } + } } pub fn wait(self: *WaitGroup) void { - var event = Event{}; - var has_event = false; - defer if (has_event) - event.wait(); - const held = self.lock.acquire(); - defer held.release(); - has_event = self.counter != 0; - if (has_event) - self.event = &event; + if (self.counter == 0) { + held.release(); + return; + } + + var event = std.ResetEvent.init(); + defer event.deinit(); + + std.debug.assert(self.event == null); + self.event = &event; + + held.release(); + event.wait(); } -- cgit v1.2.3 From b2e1bce2405cc4ff15d660f788db1aed35c890d5 Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Mon, 21 Dec 2020 18:24:20 -0700 Subject: minor code readability changes --- lib/std/reset_event.zig | 3 ++- src/ThreadPool.zig | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) (limited to 'src/ThreadPool.zig') diff --git a/lib/std/reset_event.zig b/lib/std/reset_event.zig index cdbad71c75..7df797f955 100644 --- a/lib/std/reset_event.zig +++ b/lib/std/reset_event.zig @@ -136,7 +136,8 @@ const PosixEvent = struct { } fn set(self: *PosixEvent) void { - assert(c.sem_post(self.getInitializedSem()) == 0); + const sem = self.getInitializedSem(); + assert(c.sem_post(sem) == 0); } fn wait(self: *PosixEvent) void { diff --git a/src/ThreadPool.zig b/src/ThreadPool.zig index 71c72fb8da..cf9c02fa59 100644 --- a/src/ThreadPool.zig +++ b/src/ThreadPool.zig @@ -105,12 +105,12 @@ fn runWorker(self: *ThreadPool) void { if (self.is_running) { var idle_node = IdleQueue.Node{ .data = std.ResetEvent.init() }; - defer idle_node.data.deinit(); self.idle_queue.prepend(&idle_node); held.release(); idle_node.data.wait(); + idle_node.data.deinit(); continue; } -- cgit v1.2.3 From 177377b6e356b34bbed40cadca596658d158af6b Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Wed, 23 Dec 2020 16:57:18 -0800 Subject: rework std.ResetEvent, improve std lib Darwin integration * split std.ResetEvent into: - ResetEvent - requires init() at runtime and it can fail. Also requires deinit(). - StaticResetEvent - can be statically initialized and requires no deinitialization. Initialization cannot fail. * the POSIX sem_t implementation can in fact fail on initialization because it is allowed to be implemented as a file descriptor. * Completely define, clarify, and explain in detail the semantics of these APIs. Remove the `isSet` function. * `ResetEvent.timedWait` returns an enum instead of a possible error. * `ResetEvent.init` takes a pointer to the ResetEvent instead of returning a copy. * On Darwin, `ResetEvent` is implemented using Grand Central Dispatch, which is exposed by libSystem. stage2 changes: * ThreadPool: use a single, pre-initialized `ResetEvent` per worker. * WaitGroup: now requires init() and deinit() and init() can fail. - Add a `reset` function. - Compilation initializes one for the work queue in creation and re-uses it for every update. - Rename `stop` to `finish`. - Simplify the implementation based on the usage pattern. --- CMakeLists.txt | 5 +- lib/std/ResetEvent.zig | 297 +++++++++++++++++++++++++ lib/std/StaticResetEvent.zig | 396 ++++++++++++++++++++++++++++++++++ lib/std/c/darwin.zig | 12 ++ lib/std/debug.zig | 3 +- lib/std/fs/test.zig | 2 - lib/std/mutex.zig | 4 +- lib/std/reset_event.zig | 501 ------------------------------------------- lib/std/std.zig | 3 +- src/Compilation.zig | 18 +- src/ThreadPool.zig | 96 +++++---- src/WaitGroup.zig | 35 +-- 12 files changed, 805 insertions(+), 567 deletions(-) create mode 100644 lib/std/ResetEvent.zig create mode 100644 lib/std/StaticResetEvent.zig delete mode 100644 lib/std/reset_event.zig (limited to 'src/ThreadPool.zig') diff --git a/CMakeLists.txt b/CMakeLists.txt index 272cdc6921..b9b9310946 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -410,11 +410,12 @@ set(ZIG_STAGE2_SOURCES "${CMAKE_SOURCE_DIR}/lib/std/os/windows/bits.zig" "${CMAKE_SOURCE_DIR}/lib/std/os/windows/ntstatus.zig" "${CMAKE_SOURCE_DIR}/lib/std/os/windows/win32error.zig" + "${CMAKE_SOURCE_DIR}/lib/std/Progress.zig" + "${CMAKE_SOURCE_DIR}/lib/std/ResetEvent.zig" + "${CMAKE_SOURCE_DIR}/lib/std/StaticResetEvent.zig" "${CMAKE_SOURCE_DIR}/lib/std/pdb.zig" "${CMAKE_SOURCE_DIR}/lib/std/process.zig" - "${CMAKE_SOURCE_DIR}/lib/std/Progress.zig" "${CMAKE_SOURCE_DIR}/lib/std/rand.zig" - "${CMAKE_SOURCE_DIR}/lib/std/reset_event.zig" "${CMAKE_SOURCE_DIR}/lib/std/sort.zig" "${CMAKE_SOURCE_DIR}/lib/std/special/compiler_rt.zig" "${CMAKE_SOURCE_DIR}/lib/std/special/compiler_rt/addXf3.zig" diff --git a/lib/std/ResetEvent.zig b/lib/std/ResetEvent.zig new file mode 100644 index 0000000000..cd62eb6e21 --- /dev/null +++ b/lib/std/ResetEvent.zig @@ -0,0 +1,297 @@ +// SPDX-License-Identifier: MIT +// Copyright (c) 2015-2020 Zig Contributors +// This file is part of [zig](https://ziglang.org/), which is MIT licensed. +// The MIT license requires this copyright notice to be included in all copies +// and substantial portions of the software. + +//! A thread-safe resource which supports blocking until signaled. +//! This API is for kernel threads, not evented I/O. +//! This API requires being initialized at runtime, and initialization +//! can fail. Once initialized, the core operations cannot fail. +//! If you need an abstraction that cannot fail to be initialized, see +//! `std.StaticResetEvent`. However if you can handle initialization failure, +//! it is preferred to use `ResetEvent`. + +const ResetEvent = @This(); +const std = @import("std.zig"); +const builtin = std.builtin; +const testing = std.testing; +const assert = std.debug.assert; +const c = std.c; +const os = std.os; +const time = std.time; + +impl: Impl, + +pub const Impl = if (builtin.single_threaded) + std.StaticResetEvent.DebugEvent +else if (std.Target.current.isDarwin()) + DarwinEvent +else if (std.Thread.use_pthreads) + PosixEvent +else + std.StaticResetEvent.AtomicEvent; + +pub const InitError = error{SystemResources}; + +/// After `init`, it is legal to call any other function. +pub fn init(ev: *ResetEvent) InitError!void { + return ev.impl.init(); +} + +/// This function is not thread-safe. +/// After `deinit`, the only legal function to call is `init`. +pub fn deinit(ev: *ResetEvent) void { + return ev.impl.deinit(); +} + +/// Sets the event if not already set and wakes up all the threads waiting on +/// the event. It is safe to call `set` multiple times before calling `wait`. +/// However it is illegal to call `set` after `wait` is called until the event +/// is `reset`. This function is thread-safe. +pub fn set(ev: *ResetEvent) void { + return ev.impl.set(); +} + +/// Resets the event to its original, unset state. +/// This function is *not* thread-safe. It is equivalent to calling +/// `deinit` followed by `init` but without the possibility of failure. +pub fn reset(ev: *ResetEvent) void { + return ev.impl.reset(); +} + +/// Wait for the event to be set by blocking the current thread. +/// Thread-safe. No spurious wakeups. +/// Upon return from `wait`, the only functions available to be called +/// in `ResetEvent` are `reset` and `deinit`. +pub fn wait(ev: *ResetEvent) void { + return ev.impl.wait(); +} + +pub const TimedWaitResult = enum { event_set, timed_out }; + +/// Wait for the event to be set by blocking the current thread. +/// A timeout in nanoseconds can be provided as a hint for how +/// long the thread should block on the unset event before returning +/// `TimedWaitResult.timed_out`. +/// Thread-safe. No precision of timing is guaranteed. +/// Upon return from `wait`, the only functions available to be called +/// in `ResetEvent` are `reset` and `deinit`. +pub fn timedWait(ev: *ResetEvent, timeout_ns: u64) TimedWaitResult { + return ev.impl.timedWait(timeout_ns); +} + +/// Apple has decided to not support POSIX semaphores, so we go with a +/// different approach using Grand Central Dispatch. This API is exposed +/// by libSystem so it is guaranteed to be available on all Darwin platforms. +pub const DarwinEvent = struct { + sem: c.dispatch_semaphore_t = undefined, + + pub fn init(ev: *DarwinEvent) !void { + ev.* = .{ + .sem = c.dispatch_semaphore_create(0) orelse return error.SystemResources, + }; + } + + pub fn deinit(ev: *DarwinEvent) void { + c.dispatch_release(ev.sem); + ev.* = undefined; + } + + pub fn set(ev: *DarwinEvent) void { + // Empirically this returns the numerical value of the semaphore. + _ = c.dispatch_semaphore_signal(ev.sem); + } + + pub fn wait(ev: *DarwinEvent) void { + assert(c.dispatch_semaphore_wait(ev.sem, c.DISPATCH_TIME_FOREVER) == 0); + } + + pub fn timedWait(ev: *DarwinEvent, timeout_ns: u64) TimedWaitResult { + const t = c.dispatch_time(c.DISPATCH_TIME_NOW, @intCast(i64, timeout_ns)); + if (c.dispatch_semaphore_wait(ev.sem, t) != 0) { + return .timed_out; + } else { + return .event_set; + } + } + + pub fn reset(ev: *DarwinEvent) void { + // Keep calling until the semaphore goes back down to 0. + while (c.dispatch_semaphore_wait(ev.sem, c.DISPATCH_TIME_NOW) == 0) {} + } +}; + +/// POSIX semaphores must be initialized at runtime because they are allowed to +/// be implemented as file descriptors, in which case initialization would require +/// a syscall to open the fd. +pub const PosixEvent = struct { + sem: c.sem_t = undefined, + + pub fn init(ev: *PosixEvent) !void { + switch (c.getErrno(c.sem_init(&ev.sem, 0, 0))) { + 0 => return, + else => return error.SystemResources, + } + } + + pub fn deinit(ev: *PosixEvent) void { + assert(c.sem_destroy(&ev.sem) == 0); + ev.* = undefined; + } + + pub fn set(ev: *PosixEvent) void { + assert(c.sem_post(&ev.sem) == 0); + } + + pub fn wait(ev: *PosixEvent) void { + while (true) { + switch (c.getErrno(c.sem_wait(&ev.sem))) { + 0 => return, + c.EINTR => continue, + c.EINVAL => unreachable, + else => unreachable, + } + } + } + + pub fn timedWait(ev: *PosixEvent, timeout_ns: u64) TimedWaitResult { + var ts: os.timespec = undefined; + var timeout_abs = timeout_ns; + os.clock_gettime(os.CLOCK_REALTIME, &ts) catch return .timed_out; + timeout_abs += @intCast(u64, ts.tv_sec) * time.ns_per_s; + timeout_abs += @intCast(u64, ts.tv_nsec); + ts.tv_sec = @intCast(@TypeOf(ts.tv_sec), @divFloor(timeout_abs, time.ns_per_s)); + ts.tv_nsec = @intCast(@TypeOf(ts.tv_nsec), @mod(timeout_abs, time.ns_per_s)); + while (true) { + switch (c.getErrno(c.sem_timedwait(&ev.sem, &ts))) { + 0 => return .event_set, + c.EINTR => continue, + c.EINVAL => unreachable, + c.ETIMEDOUT => return .timed_out, + else => unreachable, + } + } + } + + pub fn reset(ev: *PosixEvent) void { + while (true) { + switch (c.getErrno(c.sem_trywait(&ev.sem))) { + 0 => continue, // Need to make it go to zero. + c.EINTR => continue, + c.EINVAL => unreachable, + c.EAGAIN => return, // The semaphore currently has the value zero. + else => unreachable, + } + } + } +}; + +test "basic usage" { + var event: ResetEvent = undefined; + try event.init(); + defer event.deinit(); + + // test event setting + event.set(); + + // test event resetting + event.reset(); + + // test event waiting (non-blocking) + event.set(); + event.wait(); + event.reset(); + + event.set(); + testing.expectEqual(TimedWaitResult.event_set, event.timedWait(1)); + + // test cross-thread signaling + if (builtin.single_threaded) + return; + + const Context = struct { + const Self = @This(); + + value: u128, + in: ResetEvent, + out: ResetEvent, + + fn init(self: *Self) !void { + self.* = .{ + .value = 0, + .in = undefined, + .out = undefined, + }; + try self.in.init(); + try self.out.init(); + } + + fn deinit(self: *Self) void { + self.in.deinit(); + self.out.deinit(); + self.* = undefined; + } + + fn sender(self: *Self) void { + // update value and signal input + testing.expect(self.value == 0); + self.value = 1; + self.in.set(); + + // wait for receiver to update value and signal output + self.out.wait(); + testing.expect(self.value == 2); + + // update value and signal final input + self.value = 3; + self.in.set(); + } + + fn receiver(self: *Self) void { + // wait for sender to update value and signal input + self.in.wait(); + assert(self.value == 1); + + // update value and signal output + self.in.reset(); + self.value = 2; + self.out.set(); + + // wait for sender to update value and signal final input + self.in.wait(); + assert(self.value == 3); + } + + fn sleeper(self: *Self) void { + self.in.set(); + time.sleep(time.ns_per_ms * 2); + self.value = 5; + self.out.set(); + } + + fn timedWaiter(self: *Self) !void { + self.in.wait(); + testing.expectEqual(TimedWaitResult.timed_out, self.out.timedWait(time.ns_per_us)); + try self.out.timedWait(time.ns_per_ms * 100); + testing.expect(self.value == 5); + } + }; + + var context: Context = undefined; + try context.init(); + defer context.deinit(); + const receiver = try std.Thread.spawn(&context, Context.receiver); + defer receiver.wait(); + context.sender(); + + if (false) { + // I have now observed this fail on macOS, Windows, and Linux. + // https://github.com/ziglang/zig/issues/7009 + var timed = Context.init(); + defer timed.deinit(); + const sleeper = try std.Thread.spawn(&timed, Context.sleeper); + defer sleeper.wait(); + try timed.timedWaiter(); + } +} diff --git a/lib/std/StaticResetEvent.zig b/lib/std/StaticResetEvent.zig new file mode 100644 index 0000000000..b41e7666ac --- /dev/null +++ b/lib/std/StaticResetEvent.zig @@ -0,0 +1,396 @@ +// SPDX-License-Identifier: MIT +// Copyright (c) 2015-2020 Zig Contributors +// This file is part of [zig](https://ziglang.org/), which is MIT licensed. +// The MIT license requires this copyright notice to be included in all copies +// and substantial portions of the software. + +//! A thread-safe resource which supports blocking until signaled. +//! This API is for kernel threads, not evented I/O. +//! This API is statically initializable. It cannot fail to be initialized +//! and it requires no deinitialization. The downside is that it may not +//! integrate as cleanly into other synchronization APIs, or, in a worst case, +//! may be forced to fall back on spin locking. As a rule of thumb, prefer +//! to use `std.ResetEvent` when possible, and use `StaticResetEvent` when +//! the logic needs stronger API guarantees. + +const std = @import("std.zig"); +const StaticResetEvent = @This(); +const SpinLock = std.SpinLock; +const assert = std.debug.assert; +const os = std.os; +const time = std.time; +const linux = std.os.linux; +const windows = std.os.windows; +const testing = std.testing; + +impl: Impl = .{}, + +pub const Impl = if (std.builtin.single_threaded) + DebugEvent +else + AtomicEvent; + +/// Sets the event if not already set and wakes up all the threads waiting on +/// the event. It is safe to call `set` multiple times before calling `wait`. +/// However it is illegal to call `set` after `wait` is called until the event +/// is `reset`. This function is thread-safe. +pub fn set(ev: *StaticResetEvent) void { + return ev.impl.set(); +} + +/// Wait for the event to be set by blocking the current thread. +/// Thread-safe. No spurious wakeups. +/// Upon return from `wait`, the only function available to be called +/// in `StaticResetEvent` is `reset`. +pub fn wait(ev: *StaticResetEvent) void { + return ev.impl.wait(); +} + +/// Resets the event to its original, unset state. +/// This function is *not* thread-safe. It is equivalent to calling +/// `deinit` followed by `init` but without the possibility of failure. +pub fn reset(ev: *StaticResetEvent) void { + return ev.impl.reset(); +} + +pub const TimedWaitResult = std.ResetEvent.TimedWaitResult; + +/// Wait for the event to be set by blocking the current thread. +/// A timeout in nanoseconds can be provided as a hint for how +/// long the thread should block on the unset event before returning +/// `TimedWaitResult.timed_out`. +/// Thread-safe. No precision of timing is guaranteed. +/// Upon return from `timedWait`, the only function available to be called +/// in `StaticResetEvent` is `reset`. +pub fn timedWait(ev: *StaticResetEvent, timeout_ns: u64) TimedWaitResult { + return ev.impl.timedWait(timeout_ns); +} + +/// For single-threaded builds, we use this to detect deadlocks. +/// In unsafe modes this ends up being no-ops. +pub const DebugEvent = struct { + state: State = State.unset, + + const State = enum { + unset, + set, + waited, + }; + + /// This function is provided so that this type can be re-used inside + /// `std.ResetEvent`. + pub fn init(ev: *DebugEvent) void { + ev.* = .{}; + } + + /// This function is provided so that this type can be re-used inside + /// `std.ResetEvent`. + pub fn deinit(ev: *DebugEvent) void { + ev.* = undefined; + } + + pub fn set(ev: *DebugEvent) void { + switch (ev.state) { + .unset => ev.state = .set, + .set => {}, + .waited => unreachable, // Not allowed to call `set` until `reset`. + } + } + + pub fn wait(ev: *DebugEvent) void { + switch (ev.state) { + .unset => unreachable, // Deadlock detected. + .set => return, + .waited => unreachable, // Not allowed to call `wait` until `reset`. + } + } + + fn timedWait(ev: *DebugEvent, timeout: u64) TimedWaitResult { + switch (ev.state) { + .unset => return .timed_out, + .set => return .event_set, + .waited => unreachable, // Not allowed to call `wait` until `reset`. + } + } + + pub fn reset(ev: *DebugEvent) void { + ev.state = .unset; + } +}; + +pub const AtomicEvent = struct { + waiters: u32 = 0, + + const WAKE = 1 << 0; + const WAIT = 1 << 1; + + /// This function is provided so that this type can be re-used inside + /// `std.ResetEvent`. + pub fn init(ev: *AtomicEvent) void { + ev.* = .{}; + } + + /// This function is provided so that this type can be re-used inside + /// `std.ResetEvent`. + pub fn deinit(ev: *AtomicEvent) void { + ev.* = undefined; + } + + pub fn set(ev: *AtomicEvent) void { + const waiters = @atomicRmw(u32, &ev.waiters, .Xchg, WAKE, .Release); + if (waiters >= WAIT) { + return Futex.wake(&ev.waiters, waiters >> 1); + } + } + + pub fn wait(ev: *AtomicEvent) void { + switch (ev.timedWait(null)) { + .timed_out => unreachable, + .event_set => return, + } + } + + pub fn timedWait(ev: *AtomicEvent, timeout: ?u64) TimedWaitResult { + var waiters = @atomicLoad(u32, &ev.waiters, .Acquire); + while (waiters != WAKE) { + waiters = @cmpxchgWeak(u32, &ev.waiters, waiters, waiters + WAIT, .Acquire, .Acquire) orelse { + if (Futex.wait(&ev.waiters, timeout)) |_| { + return .event_set; + } else |_| { + return .timed_out; + } + }; + } + return .event_set; + } + + pub fn reset(ev: *AtomicEvent) void { + @atomicStore(u32, &ev.waiters, 0, .Monotonic); + } + + pub const Futex = switch (std.Target.current.os.tag) { + .windows => WindowsFutex, + .linux => LinuxFutex, + else => SpinFutex, + }; + + pub const SpinFutex = struct { + fn wake(waiters: *u32, wake_count: u32) void {} + + fn wait(waiters: *u32, timeout: ?u64) !void { + var timer: time.Timer = undefined; + if (timeout != null) + timer = time.Timer.start() catch return error.TimedOut; + + while (@atomicLoad(u32, waiters, .Acquire) != WAKE) { + SpinLock.yield(); + if (timeout) |timeout_ns| { + if (timer.read() >= timeout_ns) + return error.TimedOut; + } + } + } + }; + + pub const LinuxFutex = struct { + fn wake(waiters: *u32, wake_count: u32) void { + const waiting = std.math.maxInt(i32); // wake_count + const ptr = @ptrCast(*const i32, waiters); + const rc = linux.futex_wake(ptr, linux.FUTEX_WAKE | linux.FUTEX_PRIVATE_FLAG, waiting); + assert(linux.getErrno(rc) == 0); + } + + fn wait(waiters: *u32, timeout: ?u64) !void { + var ts: linux.timespec = undefined; + var ts_ptr: ?*linux.timespec = null; + if (timeout) |timeout_ns| { + ts_ptr = &ts; + ts.tv_sec = @intCast(isize, timeout_ns / time.ns_per_s); + ts.tv_nsec = @intCast(isize, timeout_ns % time.ns_per_s); + } + + while (true) { + const waiting = @atomicLoad(u32, waiters, .Acquire); + if (waiting == WAKE) + return; + const expected = @intCast(i32, waiting); + const ptr = @ptrCast(*const i32, waiters); + const rc = linux.futex_wait(ptr, linux.FUTEX_WAIT | linux.FUTEX_PRIVATE_FLAG, expected, ts_ptr); + switch (linux.getErrno(rc)) { + 0 => continue, + os.ETIMEDOUT => return error.TimedOut, + os.EINTR => continue, + os.EAGAIN => return, + else => unreachable, + } + } + } + }; + + pub const WindowsFutex = struct { + pub fn wake(waiters: *u32, wake_count: u32) void { + const handle = getEventHandle() orelse return SpinFutex.wake(waiters, wake_count); + const key = @ptrCast(*const c_void, waiters); + + var waiting = wake_count; + while (waiting != 0) : (waiting -= 1) { + const rc = windows.ntdll.NtReleaseKeyedEvent(handle, key, windows.FALSE, null); + assert(rc == .SUCCESS); + } + } + + pub fn wait(waiters: *u32, timeout: ?u64) !void { + const handle = getEventHandle() orelse return SpinFutex.wait(waiters, timeout); + const key = @ptrCast(*const c_void, waiters); + + // NT uses timeouts in units of 100ns with negative value being relative + var timeout_ptr: ?*windows.LARGE_INTEGER = null; + var timeout_value: windows.LARGE_INTEGER = undefined; + if (timeout) |timeout_ns| { + timeout_ptr = &timeout_value; + timeout_value = -@intCast(windows.LARGE_INTEGER, timeout_ns / 100); + } + + // NtWaitForKeyedEvent doesnt have spurious wake-ups + var rc = windows.ntdll.NtWaitForKeyedEvent(handle, key, windows.FALSE, timeout_ptr); + switch (rc) { + .TIMEOUT => { + // update the wait count to signal that we're not waiting anymore. + // if the .set() thread already observed that we are, perform a + // matching NtWaitForKeyedEvent so that the .set() thread doesn't + // deadlock trying to run NtReleaseKeyedEvent above. + var waiting = @atomicLoad(u32, waiters, .Monotonic); + while (true) { + if (waiting == WAKE) { + rc = windows.ntdll.NtWaitForKeyedEvent(handle, key, windows.FALSE, null); + assert(rc == .WAIT_0); + break; + } else { + waiting = @cmpxchgWeak(u32, waiters, waiting, waiting - WAIT, .Acquire, .Monotonic) orelse break; + continue; + } + } + return error.TimedOut; + }, + .WAIT_0 => {}, + else => unreachable, + } + } + + var event_handle: usize = EMPTY; + const EMPTY = ~@as(usize, 0); + const LOADING = EMPTY - 1; + + pub fn getEventHandle() ?windows.HANDLE { + var handle = @atomicLoad(usize, &event_handle, .Monotonic); + while (true) { + switch (handle) { + EMPTY => handle = @cmpxchgWeak(usize, &event_handle, EMPTY, LOADING, .Acquire, .Monotonic) orelse { + const handle_ptr = @ptrCast(*windows.HANDLE, &handle); + const access_mask = windows.GENERIC_READ | windows.GENERIC_WRITE; + if (windows.ntdll.NtCreateKeyedEvent(handle_ptr, access_mask, null, 0) != .SUCCESS) + handle = 0; + @atomicStore(usize, &event_handle, handle, .Monotonic); + return @intToPtr(?windows.HANDLE, handle); + }, + LOADING => { + SpinLock.yield(); + handle = @atomicLoad(usize, &event_handle, .Monotonic); + }, + else => { + return @intToPtr(?windows.HANDLE, handle); + }, + } + } + } + }; +}; + +test "basic usage" { + var event = StaticResetEvent{}; + + // test event setting + event.set(); + + // test event resetting + event.reset(); + + // test event waiting (non-blocking) + event.set(); + event.wait(); + event.reset(); + + event.set(); + testing.expectEqual(TimedWaitResult.event_set, event.timedWait(1)); + + // test cross-thread signaling + if (std.builtin.single_threaded) + return; + + const Context = struct { + const Self = @This(); + + value: u128 = 0, + in: StaticResetEvent = .{}, + out: StaticResetEvent = .{}, + + fn sender(self: *Self) void { + // update value and signal input + testing.expect(self.value == 0); + self.value = 1; + self.in.set(); + + // wait for receiver to update value and signal output + self.out.wait(); + testing.expect(self.value == 2); + + // update value and signal final input + self.value = 3; + self.in.set(); + } + + fn receiver(self: *Self) void { + // wait for sender to update value and signal input + self.in.wait(); + assert(self.value == 1); + + // update value and signal output + self.in.reset(); + self.value = 2; + self.out.set(); + + // wait for sender to update value and signal final input + self.in.wait(); + assert(self.value == 3); + } + + fn sleeper(self: *Self) void { + self.in.set(); + time.sleep(time.ns_per_ms * 2); + self.value = 5; + self.out.set(); + } + + fn timedWaiter(self: *Self) !void { + self.in.wait(); + testing.expectEqual(TimedWaitResult.timed_out, self.out.timedWait(time.ns_per_us)); + try self.out.timedWait(time.ns_per_ms * 100); + testing.expect(self.value == 5); + } + }; + + var context = Context{}; + const receiver = try std.Thread.spawn(&context, Context.receiver); + defer receiver.wait(); + context.sender(); + + if (false) { + // I have now observed this fail on macOS, Windows, and Linux. + // https://github.com/ziglang/zig/issues/7009 + var timed = Context.init(); + defer timed.deinit(); + const sleeper = try std.Thread.spawn(&timed, Context.sleeper); + defer sleeper.wait(); + try timed.timedWaiter(); + } +} diff --git a/lib/std/c/darwin.zig b/lib/std/c/darwin.zig index 9c5056d9c4..0850464567 100644 --- a/lib/std/c/darwin.zig +++ b/lib/std/c/darwin.zig @@ -187,3 +187,15 @@ pub const pthread_attr_t = extern struct { }; pub extern "c" fn arc4random_buf(buf: [*]u8, len: usize) void; + +// Grand Central Dispatch is exposed by libSystem. +pub const dispatch_semaphore_t = *opaque{}; +pub const dispatch_time_t = u64; +pub const DISPATCH_TIME_NOW = @as(dispatch_time_t, 0); +pub const DISPATCH_TIME_FOREVER = ~@as(dispatch_time_t, 0); +pub extern "c" fn dispatch_semaphore_create(value: isize) ?dispatch_semaphore_t; +pub extern "c" fn dispatch_semaphore_wait(dsema: dispatch_semaphore_t, timeout: dispatch_time_t) isize; +pub extern "c" fn dispatch_semaphore_signal(dsema: dispatch_semaphore_t) isize; + +pub extern "c" fn dispatch_release(object: *c_void) void; +pub extern "c" fn dispatch_time(when: dispatch_time_t, delta: i64) dispatch_time_t; diff --git a/lib/std/debug.zig b/lib/std/debug.zig index 7284237cb2..56428075bf 100644 --- a/lib/std/debug.zig +++ b/lib/std/debug.zig @@ -274,9 +274,8 @@ pub fn panicExtra(trace: ?*const builtin.StackTrace, first_trace_addr: ?usize, c // and call abort() // Sleep forever without hammering the CPU - var event = std.ResetEvent.init(); + var event: std.StaticResetEvent = .{}; event.wait(); - unreachable; } }, diff --git a/lib/std/fs/test.zig b/lib/std/fs/test.zig index 61df39be0c..06e5bfe66b 100644 --- a/lib/std/fs/test.zig +++ b/lib/std/fs/test.zig @@ -771,8 +771,6 @@ test "open file with exclusive lock twice, make sure it waits" { std.time.sleep(SLEEP_TIMEOUT_NS); if (timer.read() >= SLEEP_TIMEOUT_NS) break; } - // Check that createFile is still waiting for the lock to be released. - testing.expect(!evt.isSet()); file.close(); // No timeout to avoid failures on heavily loaded systems. evt.wait(); diff --git a/lib/std/mutex.zig b/lib/std/mutex.zig index 349a250fea..6fa8fb6a62 100644 --- a/lib/std/mutex.zig +++ b/lib/std/mutex.zig @@ -284,7 +284,7 @@ const WindowsMutex = struct { fn acquireSlow(self: *WindowsMutex) Held { // try to use NT keyed events for blocking, falling back to spinlock if unavailable @setCold(true); - const handle = ResetEvent.OsEvent.Futex.getEventHandle() orelse return self.acquireSpinning(); + const handle = ResetEvent.Impl.Futex.getEventHandle() orelse return self.acquireSpinning(); const key = @ptrCast(*const c_void, &self.state.waiters); while (true) : (SpinLock.loopHint(1)) { @@ -312,7 +312,7 @@ const WindowsMutex = struct { pub fn release(self: Held) void { // unlock without a rmw/cmpxchg instruction @atomicStore(u8, @ptrCast(*u8, &self.mutex.state.locked), 0, .Release); - const handle = ResetEvent.OsEvent.Futex.getEventHandle() orelse return; + const handle = ResetEvent.Impl.Futex.getEventHandle() orelse return; const key = @ptrCast(*const c_void, &self.mutex.state.waiters); while (true) : (SpinLock.loopHint(1)) { diff --git a/lib/std/reset_event.zig b/lib/std/reset_event.zig deleted file mode 100644 index 58dc40994d..0000000000 --- a/lib/std/reset_event.zig +++ /dev/null @@ -1,501 +0,0 @@ -// SPDX-License-Identifier: MIT -// Copyright (c) 2015-2020 Zig Contributors -// This file is part of [zig](https://ziglang.org/), which is MIT licensed. -// The MIT license requires this copyright notice to be included in all copies -// and substantial portions of the software. -const std = @import("std.zig"); -const builtin = @import("builtin"); -const testing = std.testing; -const SpinLock = std.SpinLock; -const assert = std.debug.assert; -const c = std.c; -const os = std.os; -const time = std.time; -const linux = os.linux; -const windows = os.windows; - -/// A resource object which supports blocking until signaled. -/// Once finished, the `deinit()` method should be called for correctness. -pub const ResetEvent = struct { - os_event: OsEvent, - - pub const OsEvent = if (builtin.single_threaded) - DebugEvent - else if (std.Thread.use_pthreads) - PosixEvent - else - AtomicEvent; - - pub fn init() ResetEvent { - return ResetEvent{ .os_event = OsEvent.init() }; - } - - pub fn deinit(self: *ResetEvent) void { - self.os_event.deinit(); - } - - /// When `wait` would return without blocking, this returns `true`. - /// Note that the value may be immediately invalid upon this function's - /// return, because another thread may call `wait` in between, changing - /// the event's set/cleared status. - pub fn isSet(self: *ResetEvent) bool { - return self.os_event.isSet(); - } - - /// Sets the event if not already set and - /// wakes up all the threads waiting on the event. - pub fn set(self: *ResetEvent) void { - return self.os_event.set(); - } - - /// Resets the event to its original, unset state. - /// TODO improve these docs: - /// * under what circumstances does it make sense to call this function? - pub fn reset(self: *ResetEvent) void { - return self.os_event.reset(); - } - - /// Wait for the event to be set by blocking the current thread. - /// TODO improve these docs: - /// * is the function thread-safe? - /// * does it have suprious wakeups? - pub fn wait(self: *ResetEvent) void { - return self.os_event.wait(); - } - - /// Wait for the event to be set by blocking the current thread. - /// A timeout in nanoseconds can be provided as a hint for how - /// long the thread should block on the unset event before throwing error.TimedOut. - /// TODO improve these docs: - /// * is the function thread-safe? - /// * does it have suprious wakeups? - pub fn timedWait(self: *ResetEvent, timeout_ns: u64) !void { - return self.os_event.timedWait(timeout_ns); - } -}; - -const DebugEvent = struct { - is_set: bool, - - fn init() DebugEvent { - return DebugEvent{ .is_set = false }; - } - - fn deinit(self: *DebugEvent) void { - self.* = undefined; - } - - fn isSet(self: *DebugEvent) bool { - return self.is_set; - } - - fn reset(self: *DebugEvent) void { - self.is_set = false; - } - - fn set(self: *DebugEvent) void { - self.is_set = true; - } - - fn wait(self: *DebugEvent) void { - if (self.is_set) - return; - - @panic("deadlock detected"); - } - - fn timedWait(self: *DebugEvent, timeout: u64) !void { - if (self.is_set) - return; - - return error.TimedOut; - } -}; - -const PosixEvent = struct { - sem: c.sem_t = undefined, - /// Sadly this is needed because pthreads semaphore API does not - /// support static initialization. - init_mutex: std.mutex.PthreadMutex = .{}, - state: enum { uninit, init } = .uninit, - - fn init() PosixEvent { - return .{}; - } - - /// Not thread-safe. - fn deinit(self: *PosixEvent) void { - switch (self.state) { - .uninit => {}, - .init => { - assert(c.sem_destroy(&self.sem) == 0); - }, - } - self.* = undefined; - } - - fn isSet(self: *PosixEvent) bool { - const sem = self.getInitializedSem(); - var val: c_int = undefined; - assert(c.sem_getvalue(sem, &val) == 0); - return val > 0; - } - - fn reset(self: *PosixEvent) void { - const sem = self.getInitializedSem(); - while (true) { - switch (c.getErrno(c.sem_trywait(sem))) { - 0 => continue, // Need to make it go to zero. - c.EINTR => continue, - c.EINVAL => unreachable, - c.EAGAIN => return, // The semaphore currently has the value zero. - else => unreachable, - } - } - } - - fn set(self: *PosixEvent) void { - const sem = self.getInitializedSem(); - assert(c.sem_post(sem) == 0); - } - - fn wait(self: *PosixEvent) void { - const sem = self.getInitializedSem(); - while (true) { - switch (c.getErrno(c.sem_wait(sem))) { - 0 => return, - c.EINTR => continue, - c.EINVAL => unreachable, - else => unreachable, - } - } - } - - fn timedWait(self: *PosixEvent, timeout_ns: u64) !void { - var ts: os.timespec = undefined; - var timeout_abs = timeout_ns; - if (comptime std.Target.current.isDarwin()) { - var tv: os.darwin.timeval = undefined; - assert(os.darwin.gettimeofday(&tv, null) == 0); - timeout_abs += @intCast(u64, tv.tv_sec) * time.ns_per_s; - timeout_abs += @intCast(u64, tv.tv_usec) * time.ns_per_us; - } else { - os.clock_gettime(os.CLOCK_REALTIME, &ts) catch return error.TimedOut; - timeout_abs += @intCast(u64, ts.tv_sec) * time.ns_per_s; - timeout_abs += @intCast(u64, ts.tv_nsec); - } - ts.tv_sec = @intCast(@TypeOf(ts.tv_sec), @divFloor(timeout_abs, time.ns_per_s)); - ts.tv_nsec = @intCast(@TypeOf(ts.tv_nsec), @mod(timeout_abs, time.ns_per_s)); - const sem = self.getInitializedSem(); - while (true) { - switch (c.getErrno(c.sem_timedwait(&self.sem, &ts))) { - 0 => return, - c.EINTR => continue, - c.EINVAL => unreachable, - c.ETIMEDOUT => return error.TimedOut, - else => unreachable, - } - } - } - - fn getInitializedSem(self: *PosixEvent) *c.sem_t { - const held = self.init_mutex.acquire(); - defer held.release(); - - switch (self.state) { - .init => return &self.sem, - .uninit => { - self.state = .init; - assert(c.sem_init(&self.sem, 0, 0) == 0); - return &self.sem; - }, - } - } -}; - -const AtomicEvent = struct { - waiters: u32, - - const WAKE = 1 << 0; - const WAIT = 1 << 1; - - fn init() AtomicEvent { - return AtomicEvent{ .waiters = 0 }; - } - - fn deinit(self: *AtomicEvent) void { - self.* = undefined; - } - - fn isSet(self: *const AtomicEvent) bool { - return @atomicLoad(u32, &self.waiters, .Acquire) == WAKE; - } - - fn reset(self: *AtomicEvent) void { - @atomicStore(u32, &self.waiters, 0, .Monotonic); - } - - fn set(self: *AtomicEvent) void { - const waiters = @atomicRmw(u32, &self.waiters, .Xchg, WAKE, .Release); - if (waiters >= WAIT) { - return Futex.wake(&self.waiters, waiters >> 1); - } - } - - fn wait(self: *AtomicEvent) void { - return self.timedWait(null) catch unreachable; - } - - fn timedWait(self: *AtomicEvent, timeout: ?u64) !void { - var waiters = @atomicLoad(u32, &self.waiters, .Acquire); - while (waiters != WAKE) { - waiters = @cmpxchgWeak(u32, &self.waiters, waiters, waiters + WAIT, .Acquire, .Acquire) orelse return Futex.wait(&self.waiters, timeout); - } - } - - pub const Futex = switch (builtin.os.tag) { - .windows => WindowsFutex, - .linux => LinuxFutex, - else => SpinFutex, - }; - - const SpinFutex = struct { - fn wake(waiters: *u32, wake_count: u32) void {} - - fn wait(waiters: *u32, timeout: ?u64) !void { - // TODO: handle platforms where a monotonic timer isnt available - var timer: time.Timer = undefined; - if (timeout != null) - timer = time.Timer.start() catch unreachable; - - while (@atomicLoad(u32, waiters, .Acquire) != WAKE) { - SpinLock.yield(); - if (timeout) |timeout_ns| { - if (timer.read() >= timeout_ns) - return error.TimedOut; - } - } - } - }; - - const LinuxFutex = struct { - fn wake(waiters: *u32, wake_count: u32) void { - const waiting = std.math.maxInt(i32); // wake_count - const ptr = @ptrCast(*const i32, waiters); - const rc = linux.futex_wake(ptr, linux.FUTEX_WAKE | linux.FUTEX_PRIVATE_FLAG, waiting); - assert(linux.getErrno(rc) == 0); - } - - fn wait(waiters: *u32, timeout: ?u64) !void { - var ts: linux.timespec = undefined; - var ts_ptr: ?*linux.timespec = null; - if (timeout) |timeout_ns| { - ts_ptr = &ts; - ts.tv_sec = @intCast(isize, timeout_ns / time.ns_per_s); - ts.tv_nsec = @intCast(isize, timeout_ns % time.ns_per_s); - } - - while (true) { - const waiting = @atomicLoad(u32, waiters, .Acquire); - if (waiting == WAKE) - return; - const expected = @intCast(i32, waiting); - const ptr = @ptrCast(*const i32, waiters); - const rc = linux.futex_wait(ptr, linux.FUTEX_WAIT | linux.FUTEX_PRIVATE_FLAG, expected, ts_ptr); - switch (linux.getErrno(rc)) { - 0 => continue, - os.ETIMEDOUT => return error.TimedOut, - os.EINTR => continue, - os.EAGAIN => return, - else => unreachable, - } - } - } - }; - - const WindowsFutex = struct { - pub fn wake(waiters: *u32, wake_count: u32) void { - const handle = getEventHandle() orelse return SpinFutex.wake(waiters, wake_count); - const key = @ptrCast(*const c_void, waiters); - - var waiting = wake_count; - while (waiting != 0) : (waiting -= 1) { - const rc = windows.ntdll.NtReleaseKeyedEvent(handle, key, windows.FALSE, null); - assert(rc == .SUCCESS); - } - } - - pub fn wait(waiters: *u32, timeout: ?u64) !void { - const handle = getEventHandle() orelse return SpinFutex.wait(waiters, timeout); - const key = @ptrCast(*const c_void, waiters); - - // NT uses timeouts in units of 100ns with negative value being relative - var timeout_ptr: ?*windows.LARGE_INTEGER = null; - var timeout_value: windows.LARGE_INTEGER = undefined; - if (timeout) |timeout_ns| { - timeout_ptr = &timeout_value; - timeout_value = -@intCast(windows.LARGE_INTEGER, timeout_ns / 100); - } - - // NtWaitForKeyedEvent doesnt have spurious wake-ups - var rc = windows.ntdll.NtWaitForKeyedEvent(handle, key, windows.FALSE, timeout_ptr); - switch (rc) { - .TIMEOUT => { - // update the wait count to signal that we're not waiting anymore. - // if the .set() thread already observed that we are, perform a - // matching NtWaitForKeyedEvent so that the .set() thread doesn't - // deadlock trying to run NtReleaseKeyedEvent above. - var waiting = @atomicLoad(u32, waiters, .Monotonic); - while (true) { - if (waiting == WAKE) { - rc = windows.ntdll.NtWaitForKeyedEvent(handle, key, windows.FALSE, null); - assert(rc == .WAIT_0); - break; - } else { - waiting = @cmpxchgWeak(u32, waiters, waiting, waiting - WAIT, .Acquire, .Monotonic) orelse break; - continue; - } - } - return error.TimedOut; - }, - .WAIT_0 => {}, - else => unreachable, - } - } - - var event_handle: usize = EMPTY; - const EMPTY = ~@as(usize, 0); - const LOADING = EMPTY - 1; - - pub fn getEventHandle() ?windows.HANDLE { - var handle = @atomicLoad(usize, &event_handle, .Monotonic); - while (true) { - switch (handle) { - EMPTY => handle = @cmpxchgWeak(usize, &event_handle, EMPTY, LOADING, .Acquire, .Monotonic) orelse { - const handle_ptr = @ptrCast(*windows.HANDLE, &handle); - const access_mask = windows.GENERIC_READ | windows.GENERIC_WRITE; - if (windows.ntdll.NtCreateKeyedEvent(handle_ptr, access_mask, null, 0) != .SUCCESS) - handle = 0; - @atomicStore(usize, &event_handle, handle, .Monotonic); - return @intToPtr(?windows.HANDLE, handle); - }, - LOADING => { - SpinLock.yield(); - handle = @atomicLoad(usize, &event_handle, .Monotonic); - }, - else => { - return @intToPtr(?windows.HANDLE, handle); - }, - } - } - } - }; -}; - -test "ResetEvent" { - var event = ResetEvent.init(); - defer event.deinit(); - - // test event setting - testing.expect(!event.isSet()); - event.set(); - testing.expect(event.isSet()); - - // test event resetting - event.reset(); - testing.expect(!event.isSet()); - - // test event waiting (non-blocking) - event.set(); - event.wait(); - event.reset(); - - event.set(); - try event.timedWait(1); - - // test cross-thread signaling - if (builtin.single_threaded) - return; - - const Context = struct { - const Self = @This(); - - value: u128, - in: ResetEvent, - out: ResetEvent, - - fn init() Self { - return Self{ - .value = 0, - .in = ResetEvent.init(), - .out = ResetEvent.init(), - }; - } - - fn deinit(self: *Self) void { - self.in.deinit(); - self.out.deinit(); - self.* = undefined; - } - - fn sender(self: *Self) void { - // update value and signal input - testing.expect(self.value == 0); - self.value = 1; - self.in.set(); - - // wait for receiver to update value and signal output - self.out.wait(); - testing.expect(self.value == 2); - - // update value and signal final input - self.value = 3; - self.in.set(); - } - - fn receiver(self: *Self) void { - // wait for sender to update value and signal input - self.in.wait(); - assert(self.value == 1); - - // update value and signal output - self.in.reset(); - self.value = 2; - self.out.set(); - - // wait for sender to update value and signal final input - self.in.wait(); - assert(self.value == 3); - } - - fn sleeper(self: *Self) void { - self.in.set(); - time.sleep(time.ns_per_ms * 2); - self.value = 5; - self.out.set(); - } - - fn timedWaiter(self: *Self) !void { - self.in.wait(); - testing.expectError(error.TimedOut, self.out.timedWait(time.ns_per_us)); - try self.out.timedWait(time.ns_per_ms * 100); - testing.expect(self.value == 5); - } - }; - - var context = Context.init(); - defer context.deinit(); - const receiver = try std.Thread.spawn(&context, Context.receiver); - defer receiver.wait(); - context.sender(); - - if (false) { - // I have now observed this fail on macOS, Windows, and Linux. - // https://github.com/ziglang/zig/issues/7009 - var timed = Context.init(); - defer timed.deinit(); - const sleeper = try std.Thread.spawn(&timed, Context.sleeper); - defer sleeper.wait(); - try timed.timedWaiter(); - } -} diff --git a/lib/std/std.zig b/lib/std/std.zig index 5fbf2662b9..69f4ea671b 100644 --- a/lib/std/std.zig +++ b/lib/std/std.zig @@ -30,10 +30,11 @@ pub const PackedIntSlice = @import("packed_int_array.zig").PackedIntSlice; pub const PackedIntSliceEndian = @import("packed_int_array.zig").PackedIntSliceEndian; pub const PriorityQueue = @import("priority_queue.zig").PriorityQueue; pub const Progress = @import("Progress.zig"); -pub const ResetEvent = @import("reset_event.zig").ResetEvent; +pub const ResetEvent = @import("ResetEvent.zig"); pub const SemanticVersion = @import("SemanticVersion.zig"); pub const SinglyLinkedList = @import("linked_list.zig").SinglyLinkedList; pub const SpinLock = @import("spinlock.zig").SpinLock; +pub const StaticResetEvent = @import("StaticResetEvent.zig"); pub const StringHashMap = hash_map.StringHashMap; pub const StringHashMapUnmanaged = hash_map.StringHashMapUnmanaged; pub const StringArrayHashMap = array_hash_map.StringArrayHashMap; diff --git a/src/Compilation.zig b/src/Compilation.zig index 11521e5a52..d172cbadcc 100644 --- a/src/Compilation.zig +++ b/src/Compilation.zig @@ -135,6 +135,8 @@ emit_docs: ?EmitLoc, c_header: ?c_link.Header, +work_queue_wait_group: WaitGroup, + pub const InnerError = Module.InnerError; pub const CRTFile = struct { @@ -1006,11 +1008,15 @@ pub fn create(gpa: *Allocator, options: InitOptions) !*Compilation { .test_filter = options.test_filter, .test_name_prefix = options.test_name_prefix, .test_evented_io = options.test_evented_io, + .work_queue_wait_group = undefined, }; break :comp comp; }; errdefer comp.destroy(); + try comp.work_queue_wait_group.init(); + errdefer comp.work_queue_wait_group.deinit(); + if (comp.bin_file.options.module) |mod| { try comp.work_queue.writeItem(.{ .generate_builtin_zig = {} }); } @@ -1191,6 +1197,8 @@ pub fn destroy(self: *Compilation) void { self.cache_parent.manifest_dir.close(); if (self.owned_link_dir) |*dir| dir.close(); + self.work_queue_wait_group.deinit(); + // This destroys `self`. self.arena_state.promote(gpa).deinit(); } @@ -1405,13 +1413,13 @@ pub fn performAllTheWork(self: *Compilation) error{ TimerUnsupported, OutOfMemor var arena = std.heap.ArenaAllocator.init(self.gpa); defer arena.deinit(); - var wg = WaitGroup{}; - defer wg.wait(); + self.work_queue_wait_group.reset(); + defer self.work_queue_wait_group.wait(); while (self.c_object_work_queue.readItem()) |c_object| { - wg.start(); + self.work_queue_wait_group.start(); try self.thread_pool.spawn(workerUpdateCObject, .{ - self, c_object, &c_comp_progress_node, &wg, + self, c_object, &c_comp_progress_node, &self.work_queue_wait_group, }); } @@ -1764,7 +1772,7 @@ fn workerUpdateCObject( progress_node: *std.Progress.Node, wg: *WaitGroup, ) void { - defer wg.stop(); + defer wg.finish(); comp.updateCObject(c_object, progress_node) catch |err| switch (err) { error.AnalysisFail => return, diff --git a/src/ThreadPool.zig b/src/ThreadPool.zig index cf9c02fa59..1e91d3f731 100644 --- a/src/ThreadPool.zig +++ b/src/ThreadPool.zig @@ -9,8 +9,7 @@ const ThreadPool = @This(); lock: std.Mutex = .{}, is_running: bool = true, allocator: *std.mem.Allocator, -spawned: usize = 0, -threads: []*std.Thread, +workers: []Worker, run_queue: RunQueue = .{}, idle_queue: IdleQueue = .{}, @@ -20,23 +19,69 @@ const Runnable = struct { runFn: fn (*Runnable) void, }; +const Worker = struct { + pool: *ThreadPool, + thread: *std.Thread, + /// The node is for this worker only and must have an already initialized event + /// when the thread is spawned. + idle_node: IdleQueue.Node, + + fn run(worker: *Worker) void { + while (true) { + const held = worker.pool.lock.acquire(); + + if (worker.pool.run_queue.popFirst()) |run_node| { + held.release(); + (run_node.data.runFn)(&run_node.data); + continue; + } + + if (worker.pool.is_running) { + worker.idle_node.data.reset(); + + worker.pool.idle_queue.prepend(&worker.idle_node); + held.release(); + + worker.idle_node.data.wait(); + continue; + } + + held.release(); + return; + } + } +}; + pub fn init(self: *ThreadPool, allocator: *std.mem.Allocator) !void { self.* = .{ .allocator = allocator, - .threads = &[_]*std.Thread{}, + .workers = &[_]Worker{}, }; if (std.builtin.single_threaded) return; - errdefer self.deinit(); + const worker_count = std.math.max(1, std.Thread.cpuCount() catch 1); + self.workers = try allocator.alloc(Worker, worker_count); + errdefer allocator.free(self.workers); + + var worker_index: usize = 0; + errdefer self.destroyWorkers(worker_index); + while (worker_index < worker_count) : (worker_index += 1) { + const worker = &self.workers[worker_index]; + worker.pool = self; - var num_threads = std.math.max(1, std.Thread.cpuCount() catch 1); - self.threads = try allocator.alloc(*std.Thread, num_threads); + // Each worker requires its ResetEvent to be pre-initialized. + try worker.idle_node.data.init(); + errdefer worker.idle_node.data.deinit(); + + worker.thread = try std.Thread.spawn(worker, Worker.run); + } +} - while (num_threads > 0) : (num_threads -= 1) { - const thread = try std.Thread.spawn(self, runWorker); - self.threads[self.spawned] = thread; - self.spawned += 1; +fn destroyWorkers(self: *ThreadPool, spawned: usize) void { + for (self.workers[0..spawned]) |*worker| { + worker.thread.wait(); + worker.idle_node.data.deinit(); } } @@ -50,9 +95,8 @@ pub fn deinit(self: *ThreadPool) void { idle_node.data.set(); } - defer self.allocator.free(self.threads); - for (self.threads[0..self.spawned]) |thread| - thread.wait(); + self.destroyWorkers(self.workers.len); + self.allocator.free(self.workers); } pub fn spawn(self: *ThreadPool, comptime func: anytype, args: anytype) !void { @@ -92,29 +136,3 @@ pub fn spawn(self: *ThreadPool, comptime func: anytype, args: anytype) !void { if (self.idle_queue.popFirst()) |idle_node| idle_node.data.set(); } - -fn runWorker(self: *ThreadPool) void { - while (true) { - const held = self.lock.acquire(); - - if (self.run_queue.popFirst()) |run_node| { - held.release(); - (run_node.data.runFn)(&run_node.data); - continue; - } - - if (self.is_running) { - var idle_node = IdleQueue.Node{ .data = std.ResetEvent.init() }; - - self.idle_queue.prepend(&idle_node); - held.release(); - - idle_node.data.wait(); - idle_node.data.deinit(); - continue; - } - - held.release(); - return; - } -} diff --git a/src/WaitGroup.zig b/src/WaitGroup.zig index 6a0b12d050..bd6274c10a 100644 --- a/src/WaitGroup.zig +++ b/src/WaitGroup.zig @@ -8,7 +8,21 @@ const WaitGroup = @This(); lock: std.Mutex = .{}, counter: usize = 0, -event: ?*std.ResetEvent = null, +event: std.ResetEvent, + +pub fn init(self: *WaitGroup) !void { + self.* = .{ + .lock = .{}, + .counter = 0, + .event = undefined, + }; + try self.event.init(); +} + +pub fn deinit(self: *WaitGroup) void { + self.event.deinit(); + self.* = undefined; +} pub fn start(self: *WaitGroup) void { const held = self.lock.acquire(); @@ -17,17 +31,14 @@ pub fn start(self: *WaitGroup) void { self.counter += 1; } -pub fn stop(self: *WaitGroup) void { +pub fn finish(self: *WaitGroup) void { const held = self.lock.acquire(); defer held.release(); self.counter -= 1; if (self.counter == 0) { - if (self.event) |event| { - self.event = null; - event.set(); - } + self.event.set(); } } @@ -40,13 +51,11 @@ pub fn wait(self: *WaitGroup) void { return; } - var event = std.ResetEvent.init(); - defer event.deinit(); - - std.debug.assert(self.event == null); - self.event = &event; - held.release(); - event.wait(); + self.event.wait(); } } + +pub fn reset(self: *WaitGroup) void { + self.event.reset(); +} -- cgit v1.2.3 From a9667b5a859a589056f23df2b74b91fede0bbbfa Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Thu, 14 Jan 2021 20:41:37 -0700 Subject: organize std lib concurrency primitives and add RwLock * move concurrency primitives that always operate on kernel threads to the std.Thread namespace * remove std.SpinLock. Nobody should use this in a non-freestanding environment; the other primitives are always preferable. In freestanding, it will be necessary to put custom spin logic in there, so there are no use cases for a std lib version. * move some std lib files to the top level fields convention * add std.Thread.spinLoopHint * add std.Thread.Condition * add std.Thread.Semaphore * new implementation of std.Thread.Mutex for Windows and non-pthreads Linux * add std.Thread.RwLock Implementations provided by @kprotty --- CMakeLists.txt | 11 +- lib/std/Progress.zig | 2 +- lib/std/ResetEvent.zig | 297 --------------- lib/std/SpinLock.zig | 86 ----- lib/std/StaticResetEvent.zig | 396 -------------------- lib/std/Thread.zig | 558 +++++++++++++++++++++++++++++ lib/std/Thread/AutoResetEvent.zig | 228 ++++++++++++ lib/std/Thread/Condition.zig | 182 ++++++++++ lib/std/Thread/Mutex.zig | 303 ++++++++++++++++ lib/std/Thread/ResetEvent.zig | 297 +++++++++++++++ lib/std/Thread/RwLock.zig | 308 ++++++++++++++++ lib/std/Thread/Semaphore.zig | 39 ++ lib/std/Thread/StaticResetEvent.zig | 396 ++++++++++++++++++++ lib/std/atomic/queue.zig | 4 +- lib/std/auto_reset_event.zig | 226 ------------ lib/std/c.zig | 7 + lib/std/c/darwin.zig | 6 +- lib/std/c/emscripten.zig | 3 + lib/std/c/freebsd.zig | 3 + lib/std/c/fuchsia.zig | 3 + lib/std/c/haiku.zig | 9 + lib/std/c/hermit.zig | 3 + lib/std/c/linux.zig | 26 ++ lib/std/c/netbsd.zig | 16 + lib/std/c/openbsd.zig | 3 + lib/std/debug.zig | 8 +- lib/std/event/lock.zig | 2 +- lib/std/event/loop.zig | 6 +- lib/std/event/wait_group.zig | 2 +- lib/std/fs/test.zig | 4 +- lib/std/heap/general_purpose_allocator.zig | 10 +- lib/std/mutex.zig | 379 -------------------- lib/std/once.zig | 2 +- lib/std/os/windows/bits.zig | 5 + lib/std/os/windows/kernel32.zig | 13 + lib/std/std.zig | 13 +- lib/std/thread.zig | 526 --------------------------- src/Compilation.zig | 2 +- src/ThreadPool.zig | 4 +- src/WaitGroup.zig | 4 +- 40 files changed, 2438 insertions(+), 1954 deletions(-) delete mode 100644 lib/std/ResetEvent.zig delete mode 100644 lib/std/SpinLock.zig delete mode 100644 lib/std/StaticResetEvent.zig create mode 100644 lib/std/Thread.zig create mode 100644 lib/std/Thread/AutoResetEvent.zig create mode 100644 lib/std/Thread/Condition.zig create mode 100644 lib/std/Thread/Mutex.zig create mode 100644 lib/std/Thread/ResetEvent.zig create mode 100644 lib/std/Thread/RwLock.zig create mode 100644 lib/std/Thread/Semaphore.zig create mode 100644 lib/std/Thread/StaticResetEvent.zig delete mode 100644 lib/std/auto_reset_event.zig delete mode 100644 lib/std/mutex.zig delete mode 100644 lib/std/thread.zig (limited to 'src/ThreadPool.zig') diff --git a/CMakeLists.txt b/CMakeLists.txt index 0dfec5a953..a0c3ae84fa 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -334,7 +334,6 @@ set(ZIG_STAGE2_SOURCES "${CMAKE_SOURCE_DIR}/lib/std/atomic/int.zig" "${CMAKE_SOURCE_DIR}/lib/std/atomic/queue.zig" "${CMAKE_SOURCE_DIR}/lib/std/atomic/stack.zig" - "${CMAKE_SOURCE_DIR}/lib/std/auto_reset_event.zig" "${CMAKE_SOURCE_DIR}/lib/std/base64.zig" "${CMAKE_SOURCE_DIR}/lib/std/buf_map.zig" "${CMAKE_SOURCE_DIR}/lib/std/builtin.zig" @@ -409,7 +408,6 @@ set(ZIG_STAGE2_SOURCES "${CMAKE_SOURCE_DIR}/lib/std/meta.zig" "${CMAKE_SOURCE_DIR}/lib/std/meta/trailer_flags.zig" "${CMAKE_SOURCE_DIR}/lib/std/meta/trait.zig" - "${CMAKE_SOURCE_DIR}/lib/std/mutex.zig" "${CMAKE_SOURCE_DIR}/lib/std/os.zig" "${CMAKE_SOURCE_DIR}/lib/std/os/bits.zig" "${CMAKE_SOURCE_DIR}/lib/std/os/bits/linux.zig" @@ -426,8 +424,6 @@ set(ZIG_STAGE2_SOURCES "${CMAKE_SOURCE_DIR}/lib/std/os/windows/ntstatus.zig" "${CMAKE_SOURCE_DIR}/lib/std/os/windows/win32error.zig" "${CMAKE_SOURCE_DIR}/lib/std/Progress.zig" - "${CMAKE_SOURCE_DIR}/lib/std/ResetEvent.zig" - "${CMAKE_SOURCE_DIR}/lib/std/StaticResetEvent.zig" "${CMAKE_SOURCE_DIR}/lib/std/pdb.zig" "${CMAKE_SOURCE_DIR}/lib/std/process.zig" "${CMAKE_SOURCE_DIR}/lib/std/rand.zig" @@ -494,7 +490,6 @@ set(ZIG_STAGE2_SOURCES "${CMAKE_SOURCE_DIR}/lib/std/special/compiler_rt/udivmodti4.zig" "${CMAKE_SOURCE_DIR}/lib/std/special/compiler_rt/udivti3.zig" "${CMAKE_SOURCE_DIR}/lib/std/special/compiler_rt/umodti3.zig" - "${CMAKE_SOURCE_DIR}/lib/std/SpinLock.zig" "${CMAKE_SOURCE_DIR}/lib/std/start.zig" "${CMAKE_SOURCE_DIR}/lib/std/std.zig" "${CMAKE_SOURCE_DIR}/lib/std/target.zig" @@ -513,7 +508,11 @@ set(ZIG_STAGE2_SOURCES "${CMAKE_SOURCE_DIR}/lib/std/target/systemz.zig" "${CMAKE_SOURCE_DIR}/lib/std/target/wasm.zig" "${CMAKE_SOURCE_DIR}/lib/std/target/x86.zig" - "${CMAKE_SOURCE_DIR}/lib/std/thread.zig" + "${CMAKE_SOURCE_DIR}/lib/std/Thread.zig" + "${CMAKE_SOURCE_DIR}/lib/std/Thread/AutoResetEvent.zig" + "${CMAKE_SOURCE_DIR}/lib/std/Thread/Mutex.zig" + "${CMAKE_SOURCE_DIR}/lib/std/Thread/ResetEvent.zig" + "${CMAKE_SOURCE_DIR}/lib/std/Thread/StaticResetEvent.zig" "${CMAKE_SOURCE_DIR}/lib/std/time.zig" "${CMAKE_SOURCE_DIR}/lib/std/unicode.zig" "${CMAKE_SOURCE_DIR}/lib/std/zig.zig" diff --git a/lib/std/Progress.zig b/lib/std/Progress.zig index ca9fb8ea1f..b75ad106a4 100644 --- a/lib/std/Progress.zig +++ b/lib/std/Progress.zig @@ -50,7 +50,7 @@ done: bool = true, /// Protects the `refresh` function, as well as `node.recently_updated_child`. /// Without this, callsites would call `Node.end` and then free `Node` memory /// while it was still being accessed by the `refresh` function. -update_lock: std.Mutex = .{}, +update_lock: std.Thread.Mutex = .{}, /// Keeps track of how many columns in the terminal have been output, so that /// we can move the cursor back later. diff --git a/lib/std/ResetEvent.zig b/lib/std/ResetEvent.zig deleted file mode 100644 index 4443fdcdfb..0000000000 --- a/lib/std/ResetEvent.zig +++ /dev/null @@ -1,297 +0,0 @@ -// SPDX-License-Identifier: MIT -// Copyright (c) 2015-2021 Zig Contributors -// This file is part of [zig](https://ziglang.org/), which is MIT licensed. -// The MIT license requires this copyright notice to be included in all copies -// and substantial portions of the software. - -//! A thread-safe resource which supports blocking until signaled. -//! This API is for kernel threads, not evented I/O. -//! This API requires being initialized at runtime, and initialization -//! can fail. Once initialized, the core operations cannot fail. -//! If you need an abstraction that cannot fail to be initialized, see -//! `std.StaticResetEvent`. However if you can handle initialization failure, -//! it is preferred to use `ResetEvent`. - -const ResetEvent = @This(); -const std = @import("std.zig"); -const builtin = std.builtin; -const testing = std.testing; -const assert = std.debug.assert; -const c = std.c; -const os = std.os; -const time = std.time; - -impl: Impl, - -pub const Impl = if (builtin.single_threaded) - std.StaticResetEvent.DebugEvent -else if (std.Target.current.isDarwin()) - DarwinEvent -else if (std.Thread.use_pthreads) - PosixEvent -else - std.StaticResetEvent.AtomicEvent; - -pub const InitError = error{SystemResources}; - -/// After `init`, it is legal to call any other function. -pub fn init(ev: *ResetEvent) InitError!void { - return ev.impl.init(); -} - -/// This function is not thread-safe. -/// After `deinit`, the only legal function to call is `init`. -pub fn deinit(ev: *ResetEvent) void { - return ev.impl.deinit(); -} - -/// Sets the event if not already set and wakes up all the threads waiting on -/// the event. It is safe to call `set` multiple times before calling `wait`. -/// However it is illegal to call `set` after `wait` is called until the event -/// is `reset`. This function is thread-safe. -pub fn set(ev: *ResetEvent) void { - return ev.impl.set(); -} - -/// Resets the event to its original, unset state. -/// This function is *not* thread-safe. It is equivalent to calling -/// `deinit` followed by `init` but without the possibility of failure. -pub fn reset(ev: *ResetEvent) void { - return ev.impl.reset(); -} - -/// Wait for the event to be set by blocking the current thread. -/// Thread-safe. No spurious wakeups. -/// Upon return from `wait`, the only functions available to be called -/// in `ResetEvent` are `reset` and `deinit`. -pub fn wait(ev: *ResetEvent) void { - return ev.impl.wait(); -} - -pub const TimedWaitResult = enum { event_set, timed_out }; - -/// Wait for the event to be set by blocking the current thread. -/// A timeout in nanoseconds can be provided as a hint for how -/// long the thread should block on the unset event before returning -/// `TimedWaitResult.timed_out`. -/// Thread-safe. No precision of timing is guaranteed. -/// Upon return from `wait`, the only functions available to be called -/// in `ResetEvent` are `reset` and `deinit`. -pub fn timedWait(ev: *ResetEvent, timeout_ns: u64) TimedWaitResult { - return ev.impl.timedWait(timeout_ns); -} - -/// Apple has decided to not support POSIX semaphores, so we go with a -/// different approach using Grand Central Dispatch. This API is exposed -/// by libSystem so it is guaranteed to be available on all Darwin platforms. -pub const DarwinEvent = struct { - sem: c.dispatch_semaphore_t = undefined, - - pub fn init(ev: *DarwinEvent) !void { - ev.* = .{ - .sem = c.dispatch_semaphore_create(0) orelse return error.SystemResources, - }; - } - - pub fn deinit(ev: *DarwinEvent) void { - c.dispatch_release(ev.sem); - ev.* = undefined; - } - - pub fn set(ev: *DarwinEvent) void { - // Empirically this returns the numerical value of the semaphore. - _ = c.dispatch_semaphore_signal(ev.sem); - } - - pub fn wait(ev: *DarwinEvent) void { - assert(c.dispatch_semaphore_wait(ev.sem, c.DISPATCH_TIME_FOREVER) == 0); - } - - pub fn timedWait(ev: *DarwinEvent, timeout_ns: u64) TimedWaitResult { - const t = c.dispatch_time(c.DISPATCH_TIME_NOW, @intCast(i64, timeout_ns)); - if (c.dispatch_semaphore_wait(ev.sem, t) != 0) { - return .timed_out; - } else { - return .event_set; - } - } - - pub fn reset(ev: *DarwinEvent) void { - // Keep calling until the semaphore goes back down to 0. - while (c.dispatch_semaphore_wait(ev.sem, c.DISPATCH_TIME_NOW) == 0) {} - } -}; - -/// POSIX semaphores must be initialized at runtime because they are allowed to -/// be implemented as file descriptors, in which case initialization would require -/// a syscall to open the fd. -pub const PosixEvent = struct { - sem: c.sem_t = undefined, - - pub fn init(ev: *PosixEvent) !void { - switch (c.getErrno(c.sem_init(&ev.sem, 0, 0))) { - 0 => return, - else => return error.SystemResources, - } - } - - pub fn deinit(ev: *PosixEvent) void { - assert(c.sem_destroy(&ev.sem) == 0); - ev.* = undefined; - } - - pub fn set(ev: *PosixEvent) void { - assert(c.sem_post(&ev.sem) == 0); - } - - pub fn wait(ev: *PosixEvent) void { - while (true) { - switch (c.getErrno(c.sem_wait(&ev.sem))) { - 0 => return, - c.EINTR => continue, - c.EINVAL => unreachable, - else => unreachable, - } - } - } - - pub fn timedWait(ev: *PosixEvent, timeout_ns: u64) TimedWaitResult { - var ts: os.timespec = undefined; - var timeout_abs = timeout_ns; - os.clock_gettime(os.CLOCK_REALTIME, &ts) catch return .timed_out; - timeout_abs += @intCast(u64, ts.tv_sec) * time.ns_per_s; - timeout_abs += @intCast(u64, ts.tv_nsec); - ts.tv_sec = @intCast(@TypeOf(ts.tv_sec), @divFloor(timeout_abs, time.ns_per_s)); - ts.tv_nsec = @intCast(@TypeOf(ts.tv_nsec), @mod(timeout_abs, time.ns_per_s)); - while (true) { - switch (c.getErrno(c.sem_timedwait(&ev.sem, &ts))) { - 0 => return .event_set, - c.EINTR => continue, - c.EINVAL => unreachable, - c.ETIMEDOUT => return .timed_out, - else => unreachable, - } - } - } - - pub fn reset(ev: *PosixEvent) void { - while (true) { - switch (c.getErrno(c.sem_trywait(&ev.sem))) { - 0 => continue, // Need to make it go to zero. - c.EINTR => continue, - c.EINVAL => unreachable, - c.EAGAIN => return, // The semaphore currently has the value zero. - else => unreachable, - } - } - } -}; - -test "basic usage" { - var event: ResetEvent = undefined; - try event.init(); - defer event.deinit(); - - // test event setting - event.set(); - - // test event resetting - event.reset(); - - // test event waiting (non-blocking) - event.set(); - event.wait(); - event.reset(); - - event.set(); - testing.expectEqual(TimedWaitResult.event_set, event.timedWait(1)); - - // test cross-thread signaling - if (builtin.single_threaded) - return; - - const Context = struct { - const Self = @This(); - - value: u128, - in: ResetEvent, - out: ResetEvent, - - fn init(self: *Self) !void { - self.* = .{ - .value = 0, - .in = undefined, - .out = undefined, - }; - try self.in.init(); - try self.out.init(); - } - - fn deinit(self: *Self) void { - self.in.deinit(); - self.out.deinit(); - self.* = undefined; - } - - fn sender(self: *Self) void { - // update value and signal input - testing.expect(self.value == 0); - self.value = 1; - self.in.set(); - - // wait for receiver to update value and signal output - self.out.wait(); - testing.expect(self.value == 2); - - // update value and signal final input - self.value = 3; - self.in.set(); - } - - fn receiver(self: *Self) void { - // wait for sender to update value and signal input - self.in.wait(); - assert(self.value == 1); - - // update value and signal output - self.in.reset(); - self.value = 2; - self.out.set(); - - // wait for sender to update value and signal final input - self.in.wait(); - assert(self.value == 3); - } - - fn sleeper(self: *Self) void { - self.in.set(); - time.sleep(time.ns_per_ms * 2); - self.value = 5; - self.out.set(); - } - - fn timedWaiter(self: *Self) !void { - self.in.wait(); - testing.expectEqual(TimedWaitResult.timed_out, self.out.timedWait(time.ns_per_us)); - try self.out.timedWait(time.ns_per_ms * 100); - testing.expect(self.value == 5); - } - }; - - var context: Context = undefined; - try context.init(); - defer context.deinit(); - const receiver = try std.Thread.spawn(&context, Context.receiver); - defer receiver.wait(); - context.sender(); - - if (false) { - // I have now observed this fail on macOS, Windows, and Linux. - // https://github.com/ziglang/zig/issues/7009 - var timed = Context.init(); - defer timed.deinit(); - const sleeper = try std.Thread.spawn(&timed, Context.sleeper); - defer sleeper.wait(); - try timed.timedWaiter(); - } -} diff --git a/lib/std/SpinLock.zig b/lib/std/SpinLock.zig deleted file mode 100644 index a16cfa930b..0000000000 --- a/lib/std/SpinLock.zig +++ /dev/null @@ -1,86 +0,0 @@ -// SPDX-License-Identifier: MIT -// Copyright (c) 2015-2021 Zig Contributors -// This file is part of [zig](https://ziglang.org/), which is MIT licensed. -// The MIT license requires this copyright notice to be included in all copies -// and substantial portions of the software. -//! A mutually exclusive lock that grinds the CPU rather than interacting with -//! the operating system. It does however yield to the OS scheduler while -//! spinning, when targeting an OS that supports it. -//! This struct can be initialized directly and statically initialized. The -//! default state is unlocked. - -state: State = State.Unlocked, - -const std = @import("std.zig"); -const builtin = @import("builtin"); -const SpinLock = @This(); - -const State = enum(u8) { - Unlocked, - Locked, -}; - -pub const Held = struct { - spinlock: *SpinLock, - - pub fn release(self: Held) void { - @atomicStore(State, &self.spinlock.state, .Unlocked, .Release); - } -}; - -pub fn tryAcquire(self: *SpinLock) ?Held { - return switch (@atomicRmw(State, &self.state, .Xchg, .Locked, .Acquire)) { - .Unlocked => Held{ .spinlock = self }, - .Locked => null, - }; -} - -pub fn acquire(self: *SpinLock) Held { - while (true) { - return self.tryAcquire() orelse { - yield(); - continue; - }; - } -} - -pub fn yield() void { - // On native windows, SwitchToThread is too expensive, - // and yielding for 380-410 iterations was found to be - // a nice sweet spot. Posix systems on the other hand, - // especially linux, perform better by yielding the thread. - switch (builtin.os.tag) { - .windows => loopHint(400), - else => std.os.sched_yield() catch loopHint(1), - } -} - -/// Hint to the cpu that execution is spinning -/// for the given amount of iterations. -pub fn loopHint(iterations: usize) void { - var i = iterations; - while (i != 0) : (i -= 1) { - switch (builtin.arch) { - // these instructions use a memory clobber as they - // flush the pipeline of any speculated reads/writes. - .i386, .x86_64 => asm volatile ("pause" - : - : - : "memory" - ), - .arm, .aarch64 => asm volatile ("yield" - : - : - : "memory" - ), - else => std.os.sched_yield() catch {}, - } - } -} - -test "basic usage" { - var lock: SpinLock = .{}; - - const held = lock.acquire(); - defer held.release(); -} diff --git a/lib/std/StaticResetEvent.zig b/lib/std/StaticResetEvent.zig deleted file mode 100644 index 4e551a565e..0000000000 --- a/lib/std/StaticResetEvent.zig +++ /dev/null @@ -1,396 +0,0 @@ -// SPDX-License-Identifier: MIT -// Copyright (c) 2015-2021 Zig Contributors -// This file is part of [zig](https://ziglang.org/), which is MIT licensed. -// The MIT license requires this copyright notice to be included in all copies -// and substantial portions of the software. - -//! A thread-safe resource which supports blocking until signaled. -//! This API is for kernel threads, not evented I/O. -//! This API is statically initializable. It cannot fail to be initialized -//! and it requires no deinitialization. The downside is that it may not -//! integrate as cleanly into other synchronization APIs, or, in a worst case, -//! may be forced to fall back on spin locking. As a rule of thumb, prefer -//! to use `std.ResetEvent` when possible, and use `StaticResetEvent` when -//! the logic needs stronger API guarantees. - -const std = @import("std.zig"); -const StaticResetEvent = @This(); -const SpinLock = std.SpinLock; -const assert = std.debug.assert; -const os = std.os; -const time = std.time; -const linux = std.os.linux; -const windows = std.os.windows; -const testing = std.testing; - -impl: Impl = .{}, - -pub const Impl = if (std.builtin.single_threaded) - DebugEvent -else - AtomicEvent; - -/// Sets the event if not already set and wakes up all the threads waiting on -/// the event. It is safe to call `set` multiple times before calling `wait`. -/// However it is illegal to call `set` after `wait` is called until the event -/// is `reset`. This function is thread-safe. -pub fn set(ev: *StaticResetEvent) void { - return ev.impl.set(); -} - -/// Wait for the event to be set by blocking the current thread. -/// Thread-safe. No spurious wakeups. -/// Upon return from `wait`, the only function available to be called -/// in `StaticResetEvent` is `reset`. -pub fn wait(ev: *StaticResetEvent) void { - return ev.impl.wait(); -} - -/// Resets the event to its original, unset state. -/// This function is *not* thread-safe. It is equivalent to calling -/// `deinit` followed by `init` but without the possibility of failure. -pub fn reset(ev: *StaticResetEvent) void { - return ev.impl.reset(); -} - -pub const TimedWaitResult = std.ResetEvent.TimedWaitResult; - -/// Wait for the event to be set by blocking the current thread. -/// A timeout in nanoseconds can be provided as a hint for how -/// long the thread should block on the unset event before returning -/// `TimedWaitResult.timed_out`. -/// Thread-safe. No precision of timing is guaranteed. -/// Upon return from `timedWait`, the only function available to be called -/// in `StaticResetEvent` is `reset`. -pub fn timedWait(ev: *StaticResetEvent, timeout_ns: u64) TimedWaitResult { - return ev.impl.timedWait(timeout_ns); -} - -/// For single-threaded builds, we use this to detect deadlocks. -/// In unsafe modes this ends up being no-ops. -pub const DebugEvent = struct { - state: State = State.unset, - - const State = enum { - unset, - set, - waited, - }; - - /// This function is provided so that this type can be re-used inside - /// `std.ResetEvent`. - pub fn init(ev: *DebugEvent) void { - ev.* = .{}; - } - - /// This function is provided so that this type can be re-used inside - /// `std.ResetEvent`. - pub fn deinit(ev: *DebugEvent) void { - ev.* = undefined; - } - - pub fn set(ev: *DebugEvent) void { - switch (ev.state) { - .unset => ev.state = .set, - .set => {}, - .waited => unreachable, // Not allowed to call `set` until `reset`. - } - } - - pub fn wait(ev: *DebugEvent) void { - switch (ev.state) { - .unset => unreachable, // Deadlock detected. - .set => return, - .waited => unreachable, // Not allowed to call `wait` until `reset`. - } - } - - pub fn timedWait(ev: *DebugEvent, timeout: u64) TimedWaitResult { - switch (ev.state) { - .unset => return .timed_out, - .set => return .event_set, - .waited => unreachable, // Not allowed to call `wait` until `reset`. - } - } - - pub fn reset(ev: *DebugEvent) void { - ev.state = .unset; - } -}; - -pub const AtomicEvent = struct { - waiters: u32 = 0, - - const WAKE = 1 << 0; - const WAIT = 1 << 1; - - /// This function is provided so that this type can be re-used inside - /// `std.ResetEvent`. - pub fn init(ev: *AtomicEvent) void { - ev.* = .{}; - } - - /// This function is provided so that this type can be re-used inside - /// `std.ResetEvent`. - pub fn deinit(ev: *AtomicEvent) void { - ev.* = undefined; - } - - pub fn set(ev: *AtomicEvent) void { - const waiters = @atomicRmw(u32, &ev.waiters, .Xchg, WAKE, .Release); - if (waiters >= WAIT) { - return Futex.wake(&ev.waiters, waiters >> 1); - } - } - - pub fn wait(ev: *AtomicEvent) void { - switch (ev.timedWait(null)) { - .timed_out => unreachable, - .event_set => return, - } - } - - pub fn timedWait(ev: *AtomicEvent, timeout: ?u64) TimedWaitResult { - var waiters = @atomicLoad(u32, &ev.waiters, .Acquire); - while (waiters != WAKE) { - waiters = @cmpxchgWeak(u32, &ev.waiters, waiters, waiters + WAIT, .Acquire, .Acquire) orelse { - if (Futex.wait(&ev.waiters, timeout)) |_| { - return .event_set; - } else |_| { - return .timed_out; - } - }; - } - return .event_set; - } - - pub fn reset(ev: *AtomicEvent) void { - @atomicStore(u32, &ev.waiters, 0, .Monotonic); - } - - pub const Futex = switch (std.Target.current.os.tag) { - .windows => WindowsFutex, - .linux => LinuxFutex, - else => SpinFutex, - }; - - pub const SpinFutex = struct { - fn wake(waiters: *u32, wake_count: u32) void {} - - fn wait(waiters: *u32, timeout: ?u64) !void { - var timer: time.Timer = undefined; - if (timeout != null) - timer = time.Timer.start() catch return error.TimedOut; - - while (@atomicLoad(u32, waiters, .Acquire) != WAKE) { - SpinLock.yield(); - if (timeout) |timeout_ns| { - if (timer.read() >= timeout_ns) - return error.TimedOut; - } - } - } - }; - - pub const LinuxFutex = struct { - fn wake(waiters: *u32, wake_count: u32) void { - const waiting = std.math.maxInt(i32); // wake_count - const ptr = @ptrCast(*const i32, waiters); - const rc = linux.futex_wake(ptr, linux.FUTEX_WAKE | linux.FUTEX_PRIVATE_FLAG, waiting); - assert(linux.getErrno(rc) == 0); - } - - fn wait(waiters: *u32, timeout: ?u64) !void { - var ts: linux.timespec = undefined; - var ts_ptr: ?*linux.timespec = null; - if (timeout) |timeout_ns| { - ts_ptr = &ts; - ts.tv_sec = @intCast(isize, timeout_ns / time.ns_per_s); - ts.tv_nsec = @intCast(isize, timeout_ns % time.ns_per_s); - } - - while (true) { - const waiting = @atomicLoad(u32, waiters, .Acquire); - if (waiting == WAKE) - return; - const expected = @intCast(i32, waiting); - const ptr = @ptrCast(*const i32, waiters); - const rc = linux.futex_wait(ptr, linux.FUTEX_WAIT | linux.FUTEX_PRIVATE_FLAG, expected, ts_ptr); - switch (linux.getErrno(rc)) { - 0 => continue, - os.ETIMEDOUT => return error.TimedOut, - os.EINTR => continue, - os.EAGAIN => return, - else => unreachable, - } - } - } - }; - - pub const WindowsFutex = struct { - pub fn wake(waiters: *u32, wake_count: u32) void { - const handle = getEventHandle() orelse return SpinFutex.wake(waiters, wake_count); - const key = @ptrCast(*const c_void, waiters); - - var waiting = wake_count; - while (waiting != 0) : (waiting -= 1) { - const rc = windows.ntdll.NtReleaseKeyedEvent(handle, key, windows.FALSE, null); - assert(rc == .SUCCESS); - } - } - - pub fn wait(waiters: *u32, timeout: ?u64) !void { - const handle = getEventHandle() orelse return SpinFutex.wait(waiters, timeout); - const key = @ptrCast(*const c_void, waiters); - - // NT uses timeouts in units of 100ns with negative value being relative - var timeout_ptr: ?*windows.LARGE_INTEGER = null; - var timeout_value: windows.LARGE_INTEGER = undefined; - if (timeout) |timeout_ns| { - timeout_ptr = &timeout_value; - timeout_value = -@intCast(windows.LARGE_INTEGER, timeout_ns / 100); - } - - // NtWaitForKeyedEvent doesnt have spurious wake-ups - var rc = windows.ntdll.NtWaitForKeyedEvent(handle, key, windows.FALSE, timeout_ptr); - switch (rc) { - .TIMEOUT => { - // update the wait count to signal that we're not waiting anymore. - // if the .set() thread already observed that we are, perform a - // matching NtWaitForKeyedEvent so that the .set() thread doesn't - // deadlock trying to run NtReleaseKeyedEvent above. - var waiting = @atomicLoad(u32, waiters, .Monotonic); - while (true) { - if (waiting == WAKE) { - rc = windows.ntdll.NtWaitForKeyedEvent(handle, key, windows.FALSE, null); - assert(rc == .WAIT_0); - break; - } else { - waiting = @cmpxchgWeak(u32, waiters, waiting, waiting - WAIT, .Acquire, .Monotonic) orelse break; - continue; - } - } - return error.TimedOut; - }, - .WAIT_0 => {}, - else => unreachable, - } - } - - var event_handle: usize = EMPTY; - const EMPTY = ~@as(usize, 0); - const LOADING = EMPTY - 1; - - pub fn getEventHandle() ?windows.HANDLE { - var handle = @atomicLoad(usize, &event_handle, .Monotonic); - while (true) { - switch (handle) { - EMPTY => handle = @cmpxchgWeak(usize, &event_handle, EMPTY, LOADING, .Acquire, .Monotonic) orelse { - const handle_ptr = @ptrCast(*windows.HANDLE, &handle); - const access_mask = windows.GENERIC_READ | windows.GENERIC_WRITE; - if (windows.ntdll.NtCreateKeyedEvent(handle_ptr, access_mask, null, 0) != .SUCCESS) - handle = 0; - @atomicStore(usize, &event_handle, handle, .Monotonic); - return @intToPtr(?windows.HANDLE, handle); - }, - LOADING => { - SpinLock.yield(); - handle = @atomicLoad(usize, &event_handle, .Monotonic); - }, - else => { - return @intToPtr(?windows.HANDLE, handle); - }, - } - } - } - }; -}; - -test "basic usage" { - var event = StaticResetEvent{}; - - // test event setting - event.set(); - - // test event resetting - event.reset(); - - // test event waiting (non-blocking) - event.set(); - event.wait(); - event.reset(); - - event.set(); - testing.expectEqual(TimedWaitResult.event_set, event.timedWait(1)); - - // test cross-thread signaling - if (std.builtin.single_threaded) - return; - - const Context = struct { - const Self = @This(); - - value: u128 = 0, - in: StaticResetEvent = .{}, - out: StaticResetEvent = .{}, - - fn sender(self: *Self) void { - // update value and signal input - testing.expect(self.value == 0); - self.value = 1; - self.in.set(); - - // wait for receiver to update value and signal output - self.out.wait(); - testing.expect(self.value == 2); - - // update value and signal final input - self.value = 3; - self.in.set(); - } - - fn receiver(self: *Self) void { - // wait for sender to update value and signal input - self.in.wait(); - assert(self.value == 1); - - // update value and signal output - self.in.reset(); - self.value = 2; - self.out.set(); - - // wait for sender to update value and signal final input - self.in.wait(); - assert(self.value == 3); - } - - fn sleeper(self: *Self) void { - self.in.set(); - time.sleep(time.ns_per_ms * 2); - self.value = 5; - self.out.set(); - } - - fn timedWaiter(self: *Self) !void { - self.in.wait(); - testing.expectEqual(TimedWaitResult.timed_out, self.out.timedWait(time.ns_per_us)); - try self.out.timedWait(time.ns_per_ms * 100); - testing.expect(self.value == 5); - } - }; - - var context = Context{}; - const receiver = try std.Thread.spawn(&context, Context.receiver); - defer receiver.wait(); - context.sender(); - - if (false) { - // I have now observed this fail on macOS, Windows, and Linux. - // https://github.com/ziglang/zig/issues/7009 - var timed = Context.init(); - defer timed.deinit(); - const sleeper = try std.Thread.spawn(&timed, Context.sleeper); - defer sleeper.wait(); - try timed.timedWaiter(); - } -} diff --git a/lib/std/Thread.zig b/lib/std/Thread.zig new file mode 100644 index 0000000000..f878f43539 --- /dev/null +++ b/lib/std/Thread.zig @@ -0,0 +1,558 @@ +// SPDX-License-Identifier: MIT +// Copyright (c) 2015-2021 Zig Contributors +// This file is part of [zig](https://ziglang.org/), which is MIT licensed. +// The MIT license requires this copyright notice to be included in all copies +// and substantial portions of the software. + +//! This struct represents a kernel thread, and acts as a namespace for concurrency +//! primitives that operate on kernel threads. For concurrency primitives that support +//! both evented I/O and async I/O, see the respective names in the top level std namespace. + +data: Data, + +pub const AutoResetEvent = @import("Thread/AutoResetEvent.zig"); +pub const ResetEvent = @import("Thread/ResetEvent.zig"); +pub const StaticResetEvent = @import("Thread/StaticResetEvent.zig"); +pub const Mutex = @import("Thread/Mutex.zig"); +pub const Semaphore = @import("Thread/Semaphore.zig"); +pub const Condition = @import("Thread/Condition.zig"); + +pub const use_pthreads = std.Target.current.os.tag != .windows and builtin.link_libc; + +const Thread = @This(); +const std = @import("std.zig"); +const builtin = std.builtin; +const os = std.os; +const mem = std.mem; +const windows = std.os.windows; +const c = std.c; +const assert = std.debug.assert; + +const bad_startfn_ret = "expected return type of startFn to be 'u8', 'noreturn', 'void', or '!void'"; + +/// Represents a kernel thread handle. +/// May be an integer or a pointer depending on the platform. +/// On Linux and POSIX, this is the same as Id. +pub const Handle = if (use_pthreads) + c.pthread_t +else switch (std.Target.current.os.tag) { + .linux => i32, + .windows => windows.HANDLE, + else => void, +}; + +/// Represents a unique ID per thread. +/// May be an integer or pointer depending on the platform. +/// On Linux and POSIX, this is the same as Handle. +pub const Id = switch (std.Target.current.os.tag) { + .windows => windows.DWORD, + else => Handle, +}; + +pub const Data = if (use_pthreads) + struct { + handle: Thread.Handle, + memory: []u8, + } +else switch (std.Target.current.os.tag) { + .linux => struct { + handle: Thread.Handle, + memory: []align(mem.page_size) u8, + }, + .windows => struct { + handle: Thread.Handle, + alloc_start: *c_void, + heap_handle: windows.HANDLE, + }, + else => struct {}, +}; + +/// Signals the processor that it is inside a busy-wait spin-loop ("spin lock"). +pub fn spinLoopHint() void { + switch (std.Target.current.cpu.arch) { + .i386, .x86_64 => asm volatile ("pause" + : + : + : "memory" + ), + .arm, .aarch64 => asm volatile ("yield" + : + : + : "memory" + ), + else => {}, + } +} + +/// Returns the ID of the calling thread. +/// Makes a syscall every time the function is called. +/// On Linux and POSIX, this Id is the same as a Handle. +pub fn getCurrentId() Id { + if (use_pthreads) { + return c.pthread_self(); + } else + return switch (std.Target.current.os.tag) { + .linux => os.linux.gettid(), + .windows => windows.kernel32.GetCurrentThreadId(), + else => @compileError("Unsupported OS"), + }; +} + +/// Returns the handle of this thread. +/// On Linux and POSIX, this is the same as Id. +/// On Linux, it is possible that the thread spawned with `spawn` +/// finishes executing entirely before the clone syscall completes. In this +/// case, this function will return 0 rather than the no-longer-existing thread's +/// pid. +pub fn handle(self: Thread) Handle { + return self.data.handle; +} + +pub fn wait(self: *Thread) void { + if (use_pthreads) { + const err = c.pthread_join(self.data.handle, null); + switch (err) { + 0 => {}, + os.EINVAL => unreachable, + os.ESRCH => unreachable, + os.EDEADLK => unreachable, + else => unreachable, + } + std.heap.c_allocator.free(self.data.memory); + std.heap.c_allocator.destroy(self); + } else switch (std.Target.current.os.tag) { + .linux => { + while (true) { + const pid_value = @atomicLoad(i32, &self.data.handle, .SeqCst); + if (pid_value == 0) break; + const rc = os.linux.futex_wait(&self.data.handle, os.linux.FUTEX_WAIT, pid_value, null); + switch (os.linux.getErrno(rc)) { + 0 => continue, + os.EINTR => continue, + os.EAGAIN => continue, + else => unreachable, + } + } + os.munmap(self.data.memory); + }, + .windows => { + windows.WaitForSingleObjectEx(self.data.handle, windows.INFINITE, false) catch unreachable; + windows.CloseHandle(self.data.handle); + windows.HeapFree(self.data.heap_handle, 0, self.data.alloc_start); + }, + else => @compileError("Unsupported OS"), + } +} + +pub const SpawnError = error{ + /// A system-imposed limit on the number of threads was encountered. + /// There are a number of limits that may trigger this error: + /// * the RLIMIT_NPROC soft resource limit (set via setrlimit(2)), + /// which limits the number of processes and threads for a real + /// user ID, was reached; + /// * the kernel's system-wide limit on the number of processes and + /// threads, /proc/sys/kernel/threads-max, was reached (see + /// proc(5)); + /// * the maximum number of PIDs, /proc/sys/kernel/pid_max, was + /// reached (see proc(5)); or + /// * the PID limit (pids.max) imposed by the cgroup "process num‐ + /// ber" (PIDs) controller was reached. + ThreadQuotaExceeded, + + /// The kernel cannot allocate sufficient memory to allocate a task structure + /// for the child, or to copy those parts of the caller's context that need to + /// be copied. + SystemResources, + + /// Not enough userland memory to spawn the thread. + OutOfMemory, + + /// `mlockall` is enabled, and the memory needed to spawn the thread + /// would exceed the limit. + LockedMemoryLimitExceeded, + + Unexpected, +}; + +/// caller must call wait on the returned thread +/// fn startFn(@TypeOf(context)) T +/// where T is u8, noreturn, void, or !void +/// caller must call wait on the returned thread +pub fn spawn(context: anytype, comptime startFn: anytype) SpawnError!*Thread { + if (builtin.single_threaded) @compileError("cannot spawn thread when building in single-threaded mode"); + // TODO compile-time call graph analysis to determine stack upper bound + // https://github.com/ziglang/zig/issues/157 + const default_stack_size = 16 * 1024 * 1024; + + const Context = @TypeOf(context); + comptime assert(@typeInfo(@TypeOf(startFn)).Fn.args[0].arg_type.? == Context); + + if (std.Target.current.os.tag == .windows) { + const WinThread = struct { + const OuterContext = struct { + thread: Thread, + inner: Context, + }; + fn threadMain(raw_arg: windows.LPVOID) callconv(.C) windows.DWORD { + const arg = if (@sizeOf(Context) == 0) {} else @ptrCast(*Context, @alignCast(@alignOf(Context), raw_arg)).*; + + switch (@typeInfo(@typeInfo(@TypeOf(startFn)).Fn.return_type.?)) { + .NoReturn => { + startFn(arg); + }, + .Void => { + startFn(arg); + return 0; + }, + .Int => |info| { + if (info.bits != 8) { + @compileError(bad_startfn_ret); + } + return startFn(arg); + }, + .ErrorUnion => |info| { + if (info.payload != void) { + @compileError(bad_startfn_ret); + } + startFn(arg) catch |err| { + std.debug.warn("error: {s}\n", .{@errorName(err)}); + if (@errorReturnTrace()) |trace| { + std.debug.dumpStackTrace(trace.*); + } + }; + return 0; + }, + else => @compileError(bad_startfn_ret), + } + } + }; + + const heap_handle = windows.kernel32.GetProcessHeap() orelse return error.OutOfMemory; + const byte_count = @alignOf(WinThread.OuterContext) + @sizeOf(WinThread.OuterContext); + const bytes_ptr = windows.kernel32.HeapAlloc(heap_handle, 0, byte_count) orelse return error.OutOfMemory; + errdefer assert(windows.kernel32.HeapFree(heap_handle, 0, bytes_ptr) != 0); + const bytes = @ptrCast([*]u8, bytes_ptr)[0..byte_count]; + const outer_context = std.heap.FixedBufferAllocator.init(bytes).allocator.create(WinThread.OuterContext) catch unreachable; + outer_context.* = WinThread.OuterContext{ + .thread = Thread{ + .data = Thread.Data{ + .heap_handle = heap_handle, + .alloc_start = bytes_ptr, + .handle = undefined, + }, + }, + .inner = context, + }; + + const parameter = if (@sizeOf(Context) == 0) null else @ptrCast(*c_void, &outer_context.inner); + outer_context.thread.data.handle = windows.kernel32.CreateThread(null, default_stack_size, WinThread.threadMain, parameter, 0, null) orelse { + switch (windows.kernel32.GetLastError()) { + else => |err| return windows.unexpectedError(err), + } + }; + return &outer_context.thread; + } + + const MainFuncs = struct { + fn linuxThreadMain(ctx_addr: usize) callconv(.C) u8 { + const arg = if (@sizeOf(Context) == 0) {} else @intToPtr(*const Context, ctx_addr).*; + + switch (@typeInfo(@typeInfo(@TypeOf(startFn)).Fn.return_type.?)) { + .NoReturn => { + startFn(arg); + }, + .Void => { + startFn(arg); + return 0; + }, + .Int => |info| { + if (info.bits != 8) { + @compileError(bad_startfn_ret); + } + return startFn(arg); + }, + .ErrorUnion => |info| { + if (info.payload != void) { + @compileError(bad_startfn_ret); + } + startFn(arg) catch |err| { + std.debug.warn("error: {s}\n", .{@errorName(err)}); + if (@errorReturnTrace()) |trace| { + std.debug.dumpStackTrace(trace.*); + } + }; + return 0; + }, + else => @compileError(bad_startfn_ret), + } + } + fn posixThreadMain(ctx: ?*c_void) callconv(.C) ?*c_void { + const arg = if (@sizeOf(Context) == 0) {} else @ptrCast(*Context, @alignCast(@alignOf(Context), ctx)).*; + + switch (@typeInfo(@typeInfo(@TypeOf(startFn)).Fn.return_type.?)) { + .NoReturn => { + startFn(arg); + }, + .Void => { + startFn(arg); + return null; + }, + .Int => |info| { + if (info.bits != 8) { + @compileError(bad_startfn_ret); + } + // pthreads don't support exit status, ignore value + _ = startFn(arg); + return null; + }, + .ErrorUnion => |info| { + if (info.payload != void) { + @compileError(bad_startfn_ret); + } + startFn(arg) catch |err| { + std.debug.warn("error: {s}\n", .{@errorName(err)}); + if (@errorReturnTrace()) |trace| { + std.debug.dumpStackTrace(trace.*); + } + }; + return null; + }, + else => @compileError(bad_startfn_ret), + } + } + }; + + if (Thread.use_pthreads) { + var attr: c.pthread_attr_t = undefined; + if (c.pthread_attr_init(&attr) != 0) return error.SystemResources; + defer assert(c.pthread_attr_destroy(&attr) == 0); + + const thread_obj = try std.heap.c_allocator.create(Thread); + errdefer std.heap.c_allocator.destroy(thread_obj); + if (@sizeOf(Context) > 0) { + thread_obj.data.memory = try std.heap.c_allocator.allocAdvanced( + u8, + @alignOf(Context), + @sizeOf(Context), + .at_least, + ); + errdefer std.heap.c_allocator.free(thread_obj.data.memory); + mem.copy(u8, thread_obj.data.memory, mem.asBytes(&context)); + } else { + thread_obj.data.memory = @as([*]u8, undefined)[0..0]; + } + + // Use the same set of parameters used by the libc-less impl. + assert(c.pthread_attr_setstacksize(&attr, default_stack_size) == 0); + assert(c.pthread_attr_setguardsize(&attr, mem.page_size) == 0); + + const err = c.pthread_create( + &thread_obj.data.handle, + &attr, + MainFuncs.posixThreadMain, + thread_obj.data.memory.ptr, + ); + switch (err) { + 0 => return thread_obj, + os.EAGAIN => return error.SystemResources, + os.EPERM => unreachable, + os.EINVAL => unreachable, + else => return os.unexpectedErrno(@intCast(usize, err)), + } + + return thread_obj; + } + + var guard_end_offset: usize = undefined; + var stack_end_offset: usize = undefined; + var thread_start_offset: usize = undefined; + var context_start_offset: usize = undefined; + var tls_start_offset: usize = undefined; + const mmap_len = blk: { + var l: usize = mem.page_size; + // Allocate a guard page right after the end of the stack region + guard_end_offset = l; + // The stack itself, which grows downwards. + l = mem.alignForward(l + default_stack_size, mem.page_size); + stack_end_offset = l; + // Above the stack, so that it can be in the same mmap call, put the Thread object. + l = mem.alignForward(l, @alignOf(Thread)); + thread_start_offset = l; + l += @sizeOf(Thread); + // Next, the Context object. + if (@sizeOf(Context) != 0) { + l = mem.alignForward(l, @alignOf(Context)); + context_start_offset = l; + l += @sizeOf(Context); + } + // Finally, the Thread Local Storage, if any. + l = mem.alignForward(l, os.linux.tls.tls_image.alloc_align); + tls_start_offset = l; + l += os.linux.tls.tls_image.alloc_size; + // Round the size to the page size. + break :blk mem.alignForward(l, mem.page_size); + }; + + const mmap_slice = mem: { + // Map the whole stack with no rw permissions to avoid + // committing the whole region right away + const mmap_slice = os.mmap( + null, + mmap_len, + os.PROT_NONE, + os.MAP_PRIVATE | os.MAP_ANONYMOUS, + -1, + 0, + ) catch |err| switch (err) { + error.MemoryMappingNotSupported => unreachable, + error.AccessDenied => unreachable, + error.PermissionDenied => unreachable, + else => |e| return e, + }; + errdefer os.munmap(mmap_slice); + + // Map everything but the guard page as rw + os.mprotect( + mmap_slice[guard_end_offset..], + os.PROT_READ | os.PROT_WRITE, + ) catch |err| switch (err) { + error.AccessDenied => unreachable, + else => |e| return e, + }; + + break :mem mmap_slice; + }; + + const mmap_addr = @ptrToInt(mmap_slice.ptr); + + const thread_ptr = @alignCast(@alignOf(Thread), @intToPtr(*Thread, mmap_addr + thread_start_offset)); + thread_ptr.data.memory = mmap_slice; + + var arg: usize = undefined; + if (@sizeOf(Context) != 0) { + arg = mmap_addr + context_start_offset; + const context_ptr = @alignCast(@alignOf(Context), @intToPtr(*Context, arg)); + context_ptr.* = context; + } + + if (std.Target.current.os.tag == .linux) { + const flags: u32 = os.CLONE_VM | os.CLONE_FS | os.CLONE_FILES | + os.CLONE_SIGHAND | os.CLONE_THREAD | os.CLONE_SYSVSEM | + os.CLONE_PARENT_SETTID | os.CLONE_CHILD_CLEARTID | + os.CLONE_DETACHED | os.CLONE_SETTLS; + // This structure is only needed when targeting i386 + var user_desc: if (std.Target.current.cpu.arch == .i386) os.linux.user_desc else void = undefined; + + const tls_area = mmap_slice[tls_start_offset..]; + const tp_value = os.linux.tls.prepareTLS(tls_area); + + const newtls = blk: { + if (std.Target.current.cpu.arch == .i386) { + user_desc = os.linux.user_desc{ + .entry_number = os.linux.tls.tls_image.gdt_entry_number, + .base_addr = tp_value, + .limit = 0xfffff, + .seg_32bit = 1, + .contents = 0, // Data + .read_exec_only = 0, + .limit_in_pages = 1, + .seg_not_present = 0, + .useable = 1, + }; + break :blk @ptrToInt(&user_desc); + } else { + break :blk tp_value; + } + }; + + const rc = os.linux.clone( + MainFuncs.linuxThreadMain, + mmap_addr + stack_end_offset, + flags, + arg, + &thread_ptr.data.handle, + newtls, + &thread_ptr.data.handle, + ); + switch (os.errno(rc)) { + 0 => return thread_ptr, + os.EAGAIN => return error.ThreadQuotaExceeded, + os.EINVAL => unreachable, + os.ENOMEM => return error.SystemResources, + os.ENOSPC => unreachable, + os.EPERM => unreachable, + os.EUSERS => unreachable, + else => |err| return os.unexpectedErrno(err), + } + } else { + @compileError("Unsupported OS"); + } +} + +pub const CpuCountError = error{ + PermissionDenied, + SystemResources, + Unexpected, +}; + +pub fn cpuCount() CpuCountError!usize { + if (std.Target.current.os.tag == .linux) { + const cpu_set = try os.sched_getaffinity(0); + return @as(usize, os.CPU_COUNT(cpu_set)); // TODO should not need this usize cast + } + if (std.Target.current.os.tag == .windows) { + return os.windows.peb().NumberOfProcessors; + } + if (std.Target.current.os.tag == .openbsd) { + var count: c_int = undefined; + var count_size: usize = @sizeOf(c_int); + const mib = [_]c_int{ os.CTL_HW, os.HW_NCPUONLINE }; + os.sysctl(&mib, &count, &count_size, null, 0) catch |err| switch (err) { + error.NameTooLong, error.UnknownName => unreachable, + else => |e| return e, + }; + return @intCast(usize, count); + } + var count: c_int = undefined; + var count_len: usize = @sizeOf(c_int); + const name = if (comptime std.Target.current.isDarwin()) "hw.logicalcpu" else "hw.ncpu"; + os.sysctlbynameZ(name, &count, &count_len, null, 0) catch |err| switch (err) { + error.NameTooLong, error.UnknownName => unreachable, + else => |e| return e, + }; + return @intCast(usize, count); +} + +pub fn getCurrentThreadId() u64 { + switch (std.Target.current.os.tag) { + .linux => { + // Use the syscall directly as musl doesn't provide a wrapper. + return @bitCast(u32, os.linux.gettid()); + }, + .windows => { + return os.windows.kernel32.GetCurrentThreadId(); + }, + .macos, .ios, .watchos, .tvos => { + var thread_id: u64 = undefined; + // Pass thread=null to get the current thread ID. + assert(c.pthread_threadid_np(null, &thread_id) == 0); + return thread_id; + }, + .netbsd => { + return @bitCast(u32, c._lwp_self()); + }, + .freebsd => { + return @bitCast(u32, c.pthread_getthreadid_np()); + }, + .openbsd => { + return @bitCast(u32, c.getthrid()); + }, + else => { + @compileError("getCurrentThreadId not implemented for this platform"); + }, + } +} + +test "" { + std.testing.refAllDecls(@This()); +} diff --git a/lib/std/Thread/AutoResetEvent.zig b/lib/std/Thread/AutoResetEvent.zig new file mode 100644 index 0000000000..8b8b5658bf --- /dev/null +++ b/lib/std/Thread/AutoResetEvent.zig @@ -0,0 +1,228 @@ +// SPDX-License-Identifier: MIT +// Copyright (c) 2015-2021 Zig Contributors +// This file is part of [zig](https://ziglang.org/), which is MIT licensed. +// The MIT license requires this copyright notice to be included in all copies +// and substantial portions of the software. + +//! Similar to `StaticResetEvent` but on `set()` it also (atomically) does `reset()`. +//! Unlike StaticResetEvent, `wait()` can only be called by one thread (MPSC-like). +//! +//! AutoResetEvent has 3 possible states: +//! - UNSET: the AutoResetEvent is currently unset +//! - SET: the AutoResetEvent was notified before a wait() was called +//! - : there is an active waiter waiting for a notification. +//! +//! When attempting to wait: +//! if the event is unset, it registers a ResetEvent pointer to be notified when the event is set +//! if the event is already set, then it consumes the notification and resets the event. +//! +//! When attempting to notify: +//! if the event is unset, then we set the event +//! if theres a waiting ResetEvent, then we unset the event and notify the ResetEvent +//! +//! This ensures that the event is automatically reset after a wait() has been issued +//! and avoids the race condition when using StaticResetEvent in the following scenario: +//! thread 1 | thread 2 +//! StaticResetEvent.wait() | +//! | StaticResetEvent.set() +//! | StaticResetEvent.set() +//! StaticResetEvent.reset() | +//! StaticResetEvent.wait() | (missed the second .set() notification above) + +state: usize = UNSET, + +const std = @import("../std.zig"); +const builtin = @import("builtin"); +const testing = std.testing; +const assert = std.debug.assert; +const StaticResetEvent = std.Thread.StaticResetEvent; +const AutoResetEvent = @This(); + +const UNSET = 0; +const SET = 1; + +/// the minimum alignment for the `*StaticResetEvent` created by wait*() +const event_align = std.math.max(@alignOf(StaticResetEvent), 2); + +pub fn wait(self: *AutoResetEvent) void { + self.waitFor(null) catch unreachable; +} + +pub fn timedWait(self: *AutoResetEvent, timeout: u64) error{TimedOut}!void { + return self.waitFor(timeout); +} + +fn waitFor(self: *AutoResetEvent, timeout: ?u64) error{TimedOut}!void { + // lazily initialized StaticResetEvent + var reset_event: StaticResetEvent align(event_align) = undefined; + var has_reset_event = false; + + var state = @atomicLoad(usize, &self.state, .SeqCst); + while (true) { + // consume a notification if there is any + if (state == SET) { + @atomicStore(usize, &self.state, UNSET, .SeqCst); + return; + } + + // check if theres currently a pending ResetEvent pointer already registered + if (state != UNSET) { + unreachable; // multiple waiting threads on the same AutoResetEvent + } + + // lazily initialize the ResetEvent if it hasn't been already + if (!has_reset_event) { + has_reset_event = true; + reset_event = .{}; + } + + // Since the AutoResetEvent currently isnt set, + // try to register our ResetEvent on it to wait + // for a set() call from another thread. + if (@cmpxchgWeak( + usize, + &self.state, + UNSET, + @ptrToInt(&reset_event), + .SeqCst, + .SeqCst, + )) |new_state| { + state = new_state; + continue; + } + + // if no timeout was specified, then just wait forever + const timeout_ns = timeout orelse { + reset_event.wait(); + return; + }; + + // wait with a timeout and return if signalled via set() + switch (reset_event.timedWait(timeout_ns)) { + .event_set => return, + .timed_out => {}, + } + + // If we timed out, we need to transition the AutoResetEvent back to UNSET. + // If we don't, then when we return, a set() thread could observe a pointer to an invalid ResetEvent. + state = @cmpxchgStrong( + usize, + &self.state, + @ptrToInt(&reset_event), + UNSET, + .SeqCst, + .SeqCst, + ) orelse return error.TimedOut; + + // We didn't manage to unregister ourselves from the state. + if (state == SET) { + unreachable; // AutoResetEvent notified without waking up the waiting thread + } else if (state != UNSET) { + unreachable; // multiple waiting threads on the same AutoResetEvent observed when timing out + } + + // This menas a set() thread saw our ResetEvent pointer, acquired it, and is trying to wake it up. + // We need to wait for it to wake up our ResetEvent before we can return and invalidate it. + // We don't return error.TimedOut here as it technically notified us while we were "timing out". + reset_event.wait(); + return; + } +} + +pub fn set(self: *AutoResetEvent) void { + var state = @atomicLoad(usize, &self.state, .SeqCst); + while (true) { + // If the AutoResetEvent is already set, there is nothing else left to do + if (state == SET) { + return; + } + + // If the AutoResetEvent isn't set, + // then try to leave a notification for the wait() thread that we set() it. + if (state == UNSET) { + state = @cmpxchgWeak( + usize, + &self.state, + UNSET, + SET, + .SeqCst, + .SeqCst, + ) orelse return; + continue; + } + + // There is a ResetEvent pointer registered on the AutoResetEvent event thats waiting. + // Try to acquire ownership of it so that we can wake it up. + // This also resets the AutoResetEvent so that there is no race condition as defined above. + if (@cmpxchgWeak( + usize, + &self.state, + state, + UNSET, + .SeqCst, + .SeqCst, + )) |new_state| { + state = new_state; + continue; + } + + const reset_event = @intToPtr(*align(event_align) StaticResetEvent, state); + reset_event.set(); + return; + } +} + +test "basic usage" { + // test local code paths + { + var event = AutoResetEvent{}; + testing.expectError(error.TimedOut, event.timedWait(1)); + event.set(); + event.wait(); + } + + // test cross-thread signaling + if (builtin.single_threaded) + return; + + const Context = struct { + value: u128 = 0, + in: AutoResetEvent = AutoResetEvent{}, + out: AutoResetEvent = AutoResetEvent{}, + + const Self = @This(); + + fn sender(self: *Self) void { + testing.expect(self.value == 0); + self.value = 1; + self.out.set(); + + self.in.wait(); + testing.expect(self.value == 2); + self.value = 3; + self.out.set(); + + self.in.wait(); + testing.expect(self.value == 4); + } + + fn receiver(self: *Self) void { + self.out.wait(); + testing.expect(self.value == 1); + self.value = 2; + self.in.set(); + + self.out.wait(); + testing.expect(self.value == 3); + self.value = 4; + self.in.set(); + } + }; + + var context = Context{}; + const send_thread = try std.Thread.spawn(&context, Context.sender); + const recv_thread = try std.Thread.spawn(&context, Context.receiver); + + send_thread.wait(); + recv_thread.wait(); +} diff --git a/lib/std/Thread/Condition.zig b/lib/std/Thread/Condition.zig new file mode 100644 index 0000000000..2379d264d1 --- /dev/null +++ b/lib/std/Thread/Condition.zig @@ -0,0 +1,182 @@ +// SPDX-License-Identifier: MIT +// Copyright (c) 2015-2021 Zig Contributors +// This file is part of [zig](https://ziglang.org/), which is MIT licensed. +// The MIT license requires this copyright notice to be included in all copies +// and substantial portions of the software. + +//! A condition provides a way for a kernel thread to block until it is signaled +//! to wake up. Spurious wakeups are possible. +//! This API supports static initialization and does not require deinitialization. + +impl: Impl, + +const std = @import("../std.zig"); +const Condition = @This(); +const windows = std.os.windows; +const linux = std.os.linux; +const Mutex = std.Thread.Mutex; +const assert = std.debug.assert; + +const Impl = if (std.builtin.single_threaded) + SingleThreadedCondition +else if (std.Target.current.os.tag == .windows) + WindowsCondition +else if (std.Thread.use_pthreads) + PthreadCondition +else + AtomicCondition; + +pub const SingleThreadedCondition = struct { + pub fn wait(cond: *SingleThreadedCondition, mutex: *Mutex) void { + unreachable; // deadlock detected + } + + pub fn signal(cond: *SingleThreadedCondition) void {} + + pub fn broadcast(cond: *SingleThreadedCondition) void {} +}; + +pub const WindowsCondition = struct { + cond: windows.CONDITION_VARIABLE = windows.CONDITION_VARIABLE_INIT, + + pub fn wait(cond: *WindowsCondition, mutex: *Mutex) void { + const rc = windows.SleepConditionVariableSRW( + &cond.cond, + &mutex.srwlock, + windows.INFINITE, + @as(windows.ULONG, 0), + ); + assert(rc != windows.FALSE); + } + + pub fn signal(cond: *WindowsCondition) void { + windows.WakeConditionVariable(&cond.cond); + } + + pub fn broadcast(cond: *WindowsCondition) void { + windows.WakeAllConditionVariable(&cond.cond); + } +}; + +pub const PthreadCondition = struct { + cond: std.c.pthread_cond_t = .{}, + + pub fn wait(cond: *PthreadCondition, mutex: *Mutex) void { + const rc = std.c.pthread_cond_wait(&cond.cond, &mutex.mutex); + assert(rc == 0); + } + + pub fn signal(cond: *PthreadCondition) void { + const rc = std.c.pthread_cond_signal(&cond.cond); + assert(rc == 0); + } + + pub fn broadcast(cond: *PthreadCondition) void { + const rc = std.c.pthread_cond_broadcast(&cond.cond); + assert(rc == 0); + } +}; + +pub const AtomicCondition = struct { + pending: bool = false, + queue_mutex: Mutex = .{}, + queue_list: QueueList = .{}, + + pub const QueueList = std.SinglyLinkedList(QueueItem); + + pub const QueueItem = struct { + futex: i32 = 0, + + fn wait(cond: *@This()) void { + while (@atomicLoad(i32, &cond.futex, .Acquire) == 0) { + switch (std.Target.current.os.tag) { + .linux => { + switch (linux.getErrno(linux.futex_wait( + &cond.futex, + linux.FUTEX_PRIVATE_FLAG | linux.FUTEX_WAIT, + 0, + null, + ))) { + 0 => {}, + std.os.EINTR => {}, + std.os.EAGAIN => {}, + else => unreachable, + } + }, + else => spinLoopHint(), + } + } + } + + fn notify(cond: *@This()) void { + @atomicStore(i32, &cond.futex, 1, .Release); + + switch (std.Target.current.os.tag) { + .linux => { + switch (linux.getErrno(linux.futex_wake( + &cond.futex, + linux.FUTEX_PRIVATE_FLAG | linux.FUTEX_WAKE, + 1, + ))) { + 0 => {}, + std.os.EFAULT => {}, + else => unreachable, + } + }, + else => {}, + } + } + }; + + pub fn wait(cond: *AtomicCondition, mutex: *Mutex) void { + var waiter = QueueList.Node{ .data = .{} }; + + { + const held = cond.queue_mutex.acquire(); + defer held.release(); + + cond.queue_list.prepend(&waiter); + @atomicStore(bool, &cond.pending, true, .SeqCst); + } + + mutex.unlock(); + waiter.data.wait(); + mutex.lock(); + } + + pub fn signal(cond: *AtomicCondition) void { + if (@atomicLoad(bool, &cond.pending, .SeqCst) == false) + return; + + const maybe_waiter = blk: { + const held = cond.queue_mutex.acquire(); + defer held.release(); + + const maybe_waiter = cond.queue_list.popFirst(); + @atomicStore(bool, &cond.pending, cond.queue_list.first != null, .SeqCst); + break :blk maybe_waiter; + }; + + if (maybe_waiter) |waiter| + waiter.data.notify(); + } + + pub fn broadcast(cond: *AtomicCondition) void { + if (@atomicLoad(bool, &cond.pending, .SeqCst) == false) + return; + + @atomicStore(bool, &cond.pending, false, .SeqCst); + + var waiters = blk: { + const held = cond.queue_mutex.acquire(); + defer held.release(); + + const waiters = cond.queue_list; + cond.queue_list = .{}; + break :blk waiters; + }; + + while (waiters.popFirst()) |waiter| + waiter.data.notify(); + } +}; diff --git a/lib/std/Thread/Mutex.zig b/lib/std/Thread/Mutex.zig new file mode 100644 index 0000000000..128eb0be80 --- /dev/null +++ b/lib/std/Thread/Mutex.zig @@ -0,0 +1,303 @@ +// SPDX-License-Identifier: MIT +// Copyright (c) 2015-2021 Zig Contributors +// This file is part of [zig](https://ziglang.org/), which is MIT licensed. +// The MIT license requires this copyright notice to be included in all copies +// and substantial portions of the software. + +//! Lock may be held only once. If the same thread tries to acquire +//! the same mutex twice, it deadlocks. This type supports static +//! initialization and is at most `@sizeOf(usize)` in size. When an +//! application is built in single threaded release mode, all the +//! functions are no-ops. In single threaded debug mode, there is +//! deadlock detection. +//! +//! Example usage: +//! var m = Mutex{}; +//! +//! const lock = m.acquire(); +//! defer lock.release(); +//! ... critical code +//! +//! Non-blocking: +//! if (m.tryAcquire) |lock| { +//! defer lock.release(); +//! // ... critical section +//! } else { +//! // ... lock not acquired +//! } + +impl: Impl = .{}, + +const Mutex = @This(); +const std = @import("../std.zig"); +const builtin = std.builtin; +const os = std.os; +const assert = std.debug.assert; +const windows = os.windows; +const linux = os.linux; +const testing = std.testing; +const StaticResetEvent = std.thread.StaticResetEvent; + +pub const Held = struct { + impl: *Impl, + + pub fn release(held: Held) void { + held.impl.release(); + } +}; + +/// Try to acquire the mutex without blocking. Returns null if +/// the mutex is unavailable. Otherwise returns Held. Call +/// release on Held. +pub fn tryAcquire(m: *Mutex) ?Held { + if (m.impl.tryAcquire()) { + return Held{ .impl = &m.impl }; + } else { + return null; + } +} + +/// Acquire the mutex. Deadlocks if the mutex is already +/// held by the calling thread. +pub fn acquire(m: *Mutex) Held { + m.impl.acquire(); + return .{ .impl = &m.impl }; +} + +const Impl = if (builtin.single_threaded) + Dummy +else if (builtin.os.tag == .windows) + WindowsMutex +else if (std.Thread.use_pthreads) + PthreadMutex +else + AtomicMutex; + +pub const AtomicMutex = struct { + state: State = .unlocked, + + const State = enum(i32) { + unlocked, + locked, + waiting, + }; + + pub fn tryAcquire(self: *AtomicMutex) bool { + return @cmpxchgStrong( + State, + &self.state, + .unlocked, + .locked, + .Acquire, + .Monotonic, + ) == null; + } + + pub fn acquire(self: *AtomicMutex) void { + switch (@atomicRmw(State, &self.state, .Xchg, .locked, .Acquire)) { + .unlocked => {}, + else => |s| self.lockSlow(s), + } + } + + fn lockSlow(self: *AtomicMutex, current_state: State) void { + @setCold(true); + var new_state = current_state; + + var spin: u8 = 0; + while (spin < 100) : (spin += 1) { + const state = @cmpxchgWeak( + State, + &self.state, + .unlocked, + new_state, + .Acquire, + .Monotonic, + ) orelse return; + + switch (state) { + .unlocked => {}, + .locked => {}, + .waiting => break, + } + + var iter = std.math.min(32, spin + 1); + while (iter > 0) : (iter -= 1) + std.Thread.spinLoopHint(); + } + + new_state = .waiting; + while (true) { + switch (@atomicRmw(State, &self.state, .Xchg, new_state, .Acquire)) { + .unlocked => return, + else => {}, + } + switch (std.Target.current.os.tag) { + .linux => { + switch (linux.getErrno(linux.futex_wait( + @ptrCast(*const i32, &self.state), + linux.FUTEX_PRIVATE_FLAG | linux.FUTEX_WAIT, + @enumToInt(new_state), + null, + ))) { + 0 => {}, + std.os.EINTR => {}, + std.os.EAGAIN => {}, + else => unreachable, + } + }, + else => std.Thread.spinLoopHint(), + } + } + } + + pub fn release(self: *AtomicMutex) void { + switch (@atomicRmw(State, &self.state, .Xchg, .unlocked, .Release)) { + .unlocked => unreachable, + .locked => {}, + .waiting => self.unlockSlow(), + } + } + + fn unlockSlow(self: *AtomicMutex) void { + @setCold(true); + + switch (std.Target.current.os.tag) { + .linux => { + switch (linux.getErrno(linux.futex_wake( + @ptrCast(*const i32, &self.state), + linux.FUTEX_PRIVATE_FLAG | linux.FUTEX_WAKE, + 1, + ))) { + 0 => {}, + std.os.EFAULT => {}, + else => unreachable, + } + }, + else => {}, + } + } +}; + +pub const PthreadMutex = struct { + pthread_mutex: std.c.pthread_mutex_t = .{}, + + /// Try to acquire the mutex without blocking. Returns null if + /// the mutex is unavailable. Otherwise returns Held. Call + /// release on Held. + pub fn tryAcquire(self: *PthreadMutex) bool { + return std.c.pthread_mutex_trylock(&self.pthread_mutex) == 0; + } + + /// Acquire the mutex. Will deadlock if the mutex is already + /// held by the calling thread. + pub fn acquire(self: *PthreadMutex) void { + switch (std.c.pthread_mutex_lock(&self.pthread_mutex)) { + 0 => return, + std.c.EINVAL => unreachable, + std.c.EBUSY => unreachable, + std.c.EAGAIN => unreachable, + std.c.EDEADLK => unreachable, + std.c.EPERM => unreachable, + else => unreachable, + } + } + + pub fn release(self: *PthreadMutex) void { + switch (std.c.pthread_mutex_unlock(&self.pthread_mutex)) { + 0 => return, + std.c.EINVAL => unreachable, + std.c.EAGAIN => unreachable, + std.c.EPERM => unreachable, + else => unreachable, + } + } +}; + +/// This has the sematics as `Mutex`, however it does not actually do any +/// synchronization. Operations are safety-checked no-ops. +pub const Dummy = struct { + lock: @TypeOf(lock_init) = lock_init, + + const lock_init = if (std.debug.runtime_safety) false else {}; + + /// Try to acquire the mutex without blocking. Returns null if + /// the mutex is unavailable. Otherwise returns Held. Call + /// release on Held. + pub fn tryAcquire(self: *Dummy) bool { + if (std.debug.runtime_safety) { + if (self.lock) return false; + self.lock = true; + } + return true; + } + + /// Acquire the mutex. Will deadlock if the mutex is already + /// held by the calling thread. + pub fn acquire(self: *Dummy) void { + return self.tryAcquire() orelse @panic("deadlock detected"); + } + + pub fn release(self: *Dummy) void { + if (std.debug.runtime_safety) { + self.mutex.lock = false; + } + } +}; + +const WindowsMutex = struct { + srwlock: windows.SRWLOCK = windows.SRWLOCK_INIT, + + pub fn tryAcquire(self: *WindowsMutex) bool { + return TryAcquireSRWLockExclusive(&self.srwlock) != system.FALSE; + } + + pub fn acquire(self: *WindowsMutex) void { + AcquireSRWLockExclusive(&self.srwlock); + } + + pub fn release(self: *WindowsMutex) void { + ReleaseSRWLockExclusive(&self.srwlock); + } +}; + +const TestContext = struct { + mutex: *Mutex, + data: i128, + + const incr_count = 10000; +}; + +test "basic usage" { + var mutex = Mutex{}; + + var context = TestContext{ + .mutex = &mutex, + .data = 0, + }; + + if (builtin.single_threaded) { + worker(&context); + testing.expect(context.data == TestContext.incr_count); + } else { + const thread_count = 10; + var threads: [thread_count]*std.Thread = undefined; + for (threads) |*t| { + t.* = try std.Thread.spawn(&context, worker); + } + for (threads) |t| + t.wait(); + + testing.expect(context.data == thread_count * TestContext.incr_count); + } +} + +fn worker(ctx: *TestContext) void { + var i: usize = 0; + while (i != TestContext.incr_count) : (i += 1) { + const held = ctx.mutex.acquire(); + defer held.release(); + + ctx.data += 1; + } +} diff --git a/lib/std/Thread/ResetEvent.zig b/lib/std/Thread/ResetEvent.zig new file mode 100644 index 0000000000..622f9be98e --- /dev/null +++ b/lib/std/Thread/ResetEvent.zig @@ -0,0 +1,297 @@ +// SPDX-License-Identifier: MIT +// Copyright (c) 2015-2021 Zig Contributors +// This file is part of [zig](https://ziglang.org/), which is MIT licensed. +// The MIT license requires this copyright notice to be included in all copies +// and substantial portions of the software. + +//! A thread-safe resource which supports blocking until signaled. +//! This API is for kernel threads, not evented I/O. +//! This API requires being initialized at runtime, and initialization +//! can fail. Once initialized, the core operations cannot fail. +//! If you need an abstraction that cannot fail to be initialized, see +//! `std.Thread.StaticResetEvent`. However if you can handle initialization failure, +//! it is preferred to use `ResetEvent`. + +const ResetEvent = @This(); +const std = @import("../std.zig"); +const builtin = std.builtin; +const testing = std.testing; +const assert = std.debug.assert; +const c = std.c; +const os = std.os; +const time = std.time; + +impl: Impl, + +pub const Impl = if (builtin.single_threaded) + std.Thread.StaticResetEvent.DebugEvent +else if (std.Target.current.isDarwin()) + DarwinEvent +else if (std.Thread.use_pthreads) + PosixEvent +else + std.Thread.StaticResetEvent.AtomicEvent; + +pub const InitError = error{SystemResources}; + +/// After `init`, it is legal to call any other function. +pub fn init(ev: *ResetEvent) InitError!void { + return ev.impl.init(); +} + +/// This function is not thread-safe. +/// After `deinit`, the only legal function to call is `init`. +pub fn deinit(ev: *ResetEvent) void { + return ev.impl.deinit(); +} + +/// Sets the event if not already set and wakes up all the threads waiting on +/// the event. It is safe to call `set` multiple times before calling `wait`. +/// However it is illegal to call `set` after `wait` is called until the event +/// is `reset`. This function is thread-safe. +pub fn set(ev: *ResetEvent) void { + return ev.impl.set(); +} + +/// Resets the event to its original, unset state. +/// This function is *not* thread-safe. It is equivalent to calling +/// `deinit` followed by `init` but without the possibility of failure. +pub fn reset(ev: *ResetEvent) void { + return ev.impl.reset(); +} + +/// Wait for the event to be set by blocking the current thread. +/// Thread-safe. No spurious wakeups. +/// Upon return from `wait`, the only functions available to be called +/// in `ResetEvent` are `reset` and `deinit`. +pub fn wait(ev: *ResetEvent) void { + return ev.impl.wait(); +} + +pub const TimedWaitResult = enum { event_set, timed_out }; + +/// Wait for the event to be set by blocking the current thread. +/// A timeout in nanoseconds can be provided as a hint for how +/// long the thread should block on the unset event before returning +/// `TimedWaitResult.timed_out`. +/// Thread-safe. No precision of timing is guaranteed. +/// Upon return from `wait`, the only functions available to be called +/// in `ResetEvent` are `reset` and `deinit`. +pub fn timedWait(ev: *ResetEvent, timeout_ns: u64) TimedWaitResult { + return ev.impl.timedWait(timeout_ns); +} + +/// Apple has decided to not support POSIX semaphores, so we go with a +/// different approach using Grand Central Dispatch. This API is exposed +/// by libSystem so it is guaranteed to be available on all Darwin platforms. +pub const DarwinEvent = struct { + sem: c.dispatch_semaphore_t = undefined, + + pub fn init(ev: *DarwinEvent) !void { + ev.* = .{ + .sem = c.dispatch_semaphore_create(0) orelse return error.SystemResources, + }; + } + + pub fn deinit(ev: *DarwinEvent) void { + c.dispatch_release(ev.sem); + ev.* = undefined; + } + + pub fn set(ev: *DarwinEvent) void { + // Empirically this returns the numerical value of the semaphore. + _ = c.dispatch_semaphore_signal(ev.sem); + } + + pub fn wait(ev: *DarwinEvent) void { + assert(c.dispatch_semaphore_wait(ev.sem, c.DISPATCH_TIME_FOREVER) == 0); + } + + pub fn timedWait(ev: *DarwinEvent, timeout_ns: u64) TimedWaitResult { + const t = c.dispatch_time(c.DISPATCH_TIME_NOW, @intCast(i64, timeout_ns)); + if (c.dispatch_semaphore_wait(ev.sem, t) != 0) { + return .timed_out; + } else { + return .event_set; + } + } + + pub fn reset(ev: *DarwinEvent) void { + // Keep calling until the semaphore goes back down to 0. + while (c.dispatch_semaphore_wait(ev.sem, c.DISPATCH_TIME_NOW) == 0) {} + } +}; + +/// POSIX semaphores must be initialized at runtime because they are allowed to +/// be implemented as file descriptors, in which case initialization would require +/// a syscall to open the fd. +pub const PosixEvent = struct { + sem: c.sem_t = undefined, + + pub fn init(ev: *PosixEvent) !void { + switch (c.getErrno(c.sem_init(&ev.sem, 0, 0))) { + 0 => return, + else => return error.SystemResources, + } + } + + pub fn deinit(ev: *PosixEvent) void { + assert(c.sem_destroy(&ev.sem) == 0); + ev.* = undefined; + } + + pub fn set(ev: *PosixEvent) void { + assert(c.sem_post(&ev.sem) == 0); + } + + pub fn wait(ev: *PosixEvent) void { + while (true) { + switch (c.getErrno(c.sem_wait(&ev.sem))) { + 0 => return, + c.EINTR => continue, + c.EINVAL => unreachable, + else => unreachable, + } + } + } + + pub fn timedWait(ev: *PosixEvent, timeout_ns: u64) TimedWaitResult { + var ts: os.timespec = undefined; + var timeout_abs = timeout_ns; + os.clock_gettime(os.CLOCK_REALTIME, &ts) catch return .timed_out; + timeout_abs += @intCast(u64, ts.tv_sec) * time.ns_per_s; + timeout_abs += @intCast(u64, ts.tv_nsec); + ts.tv_sec = @intCast(@TypeOf(ts.tv_sec), @divFloor(timeout_abs, time.ns_per_s)); + ts.tv_nsec = @intCast(@TypeOf(ts.tv_nsec), @mod(timeout_abs, time.ns_per_s)); + while (true) { + switch (c.getErrno(c.sem_timedwait(&ev.sem, &ts))) { + 0 => return .event_set, + c.EINTR => continue, + c.EINVAL => unreachable, + c.ETIMEDOUT => return .timed_out, + else => unreachable, + } + } + } + + pub fn reset(ev: *PosixEvent) void { + while (true) { + switch (c.getErrno(c.sem_trywait(&ev.sem))) { + 0 => continue, // Need to make it go to zero. + c.EINTR => continue, + c.EINVAL => unreachable, + c.EAGAIN => return, // The semaphore currently has the value zero. + else => unreachable, + } + } + } +}; + +test "basic usage" { + var event: ResetEvent = undefined; + try event.init(); + defer event.deinit(); + + // test event setting + event.set(); + + // test event resetting + event.reset(); + + // test event waiting (non-blocking) + event.set(); + event.wait(); + event.reset(); + + event.set(); + testing.expectEqual(TimedWaitResult.event_set, event.timedWait(1)); + + // test cross-thread signaling + if (builtin.single_threaded) + return; + + const Context = struct { + const Self = @This(); + + value: u128, + in: ResetEvent, + out: ResetEvent, + + fn init(self: *Self) !void { + self.* = .{ + .value = 0, + .in = undefined, + .out = undefined, + }; + try self.in.init(); + try self.out.init(); + } + + fn deinit(self: *Self) void { + self.in.deinit(); + self.out.deinit(); + self.* = undefined; + } + + fn sender(self: *Self) void { + // update value and signal input + testing.expect(self.value == 0); + self.value = 1; + self.in.set(); + + // wait for receiver to update value and signal output + self.out.wait(); + testing.expect(self.value == 2); + + // update value and signal final input + self.value = 3; + self.in.set(); + } + + fn receiver(self: *Self) void { + // wait for sender to update value and signal input + self.in.wait(); + assert(self.value == 1); + + // update value and signal output + self.in.reset(); + self.value = 2; + self.out.set(); + + // wait for sender to update value and signal final input + self.in.wait(); + assert(self.value == 3); + } + + fn sleeper(self: *Self) void { + self.in.set(); + time.sleep(time.ns_per_ms * 2); + self.value = 5; + self.out.set(); + } + + fn timedWaiter(self: *Self) !void { + self.in.wait(); + testing.expectEqual(TimedWaitResult.timed_out, self.out.timedWait(time.ns_per_us)); + try self.out.timedWait(time.ns_per_ms * 100); + testing.expect(self.value == 5); + } + }; + + var context: Context = undefined; + try context.init(); + defer context.deinit(); + const receiver = try std.Thread.spawn(&context, Context.receiver); + defer receiver.wait(); + context.sender(); + + if (false) { + // I have now observed this fail on macOS, Windows, and Linux. + // https://github.com/ziglang/zig/issues/7009 + var timed = Context.init(); + defer timed.deinit(); + const sleeper = try std.Thread.spawn(&timed, Context.sleeper); + defer sleeper.wait(); + try timed.timedWaiter(); + } +} diff --git a/lib/std/Thread/RwLock.zig b/lib/std/Thread/RwLock.zig new file mode 100644 index 0000000000..1d606a9cf1 --- /dev/null +++ b/lib/std/Thread/RwLock.zig @@ -0,0 +1,308 @@ +// SPDX-License-Identifier: MIT +// Copyright (c) 2015-2021 Zig Contributors +// This file is part of [zig](https://ziglang.org/), which is MIT licensed. +// The MIT license requires this copyright notice to be included in all copies +// and substantial portions of the software. + +//! A lock that supports one writer or many readers. +//! This API is for kernel threads, not evented I/O. +//! This API requires being initialized at runtime, and initialization +//! can fail. Once initialized, the core operations cannot fail. + +impl: Impl, + +const RwLock = @This(); +const std = @import("../std.zig"); +const builtin = std.builtin; +const assert = std.debug.assert; +const Mutex = std.Thread.Mutex; +const Semaphore = std.Semaphore; +const CondVar = std.CondVar; + +pub const Impl = if (builtin.single_threaded) + SingleThreadedRwLock +else if (std.Thread.use_pthreads) + PthreadRwLock +else + DefaultRwLock; + +pub fn init(rwl: *RwLock) void { + return rwl.impl.init(); +} + +pub fn deinit(rwl: *RwLock) void { + return rwl.impl.deinit(); +} + +/// Attempts to obtain exclusive lock ownership. +/// Returns `true` if the lock is obtained, `false` otherwise. +pub fn tryLock(rwl: *RwLock) bool { + return rwl.impl.tryLock(); +} + +/// Blocks until exclusive lock ownership is acquired. +pub fn lock(rwl: *RwLock) void { + return rwl.impl.lock(); +} + +/// Releases a held exclusive lock. +/// Asserts the lock is held exclusively. +pub fn unlock(rwl: *RwLock) void { + return rwl.impl.unlock(); +} + +/// Attempts to obtain shared lock ownership. +/// Returns `true` if the lock is obtained, `false` otherwise. +pub fn tryLockShared(rwl: *RwLock) bool { + return rwl.impl.tryLockShared(); +} + +/// Blocks until shared lock ownership is acquired. +pub fn lockShared(rwl: *RwLock) void { + return rwl.impl.lockShared(); +} + +/// Releases a held shared lock. +pub fn unlockShared(rwl: *RwLock) void { + return rwl.impl.unlockShared(); +} + +/// Single-threaded applications use this for deadlock checks in +/// debug mode, and no-ops in release modes. +pub const SingleThreadedRwLock = struct { + state: enum { unlocked, locked_exclusive, locked_shared }, + shared_count: usize, + + pub fn init(rwl: *SingleThreadedRwLock) void { + rwl.* = .{ + .state = .unlocked, + .shared_count = 0, + }; + } + + pub fn deinit(rwl: *SingleThreadedRwLock) void { + assert(rwl.state == .unlocked); + assert(rwl.shared_count == 0); + } + + /// Attempts to obtain exclusive lock ownership. + /// Returns `true` if the lock is obtained, `false` otherwise. + pub fn tryLock(rwl: *SingleThreadedRwLock) bool { + switch (rwl.state) { + .unlocked => { + assert(rwl.shared_count == 0); + rwl.state = .locked_exclusive; + return true; + }, + .locked_exclusive, .locked_shared => return false, + } + } + + /// Blocks until exclusive lock ownership is acquired. + pub fn lock(rwl: *SingleThreadedRwLock) void { + assert(rwl.state == .unlocked); // deadlock detected + assert(rwl.shared_count == 0); // corrupted state detected + rwl.state = .locked_exclusive; + } + + /// Releases a held exclusive lock. + /// Asserts the lock is held exclusively. + pub fn unlock(rwl: *SingleThreadedRwLock) void { + assert(rwl.state == .locked_exclusive); + assert(rwl.shared_count == 0); // corrupted state detected + rwl.state = .unlocked; + } + + /// Attempts to obtain shared lock ownership. + /// Returns `true` if the lock is obtained, `false` otherwise. + pub fn tryLockShared(rwl: *SingleThreadedRwLock) bool { + switch (rwl.state) { + .unlocked => { + rwl.state = .locked_shared; + assert(rwl.shared_count == 0); + rwl.shared_count = 1; + return true; + }, + .locked_exclusive, .locked_shared => return false, + } + } + + /// Blocks until shared lock ownership is acquired. + pub fn lockShared(rwl: *SingleThreadedRwLock) void { + switch (rwl.state) { + .unlocked => { + rwl.state = .locked_shared; + assert(rwl.shared_count == 0); + rwl.shared_count = 1; + }, + .locked_shared => { + rwl.shared_count += 1; + }, + .locked_exclusive => unreachable, // deadlock detected + } + } + + /// Releases a held shared lock. + pub fn unlockShared(rwl: *SingleThreadedRwLock) void { + switch (rwl.state) { + .unlocked => unreachable, // too many calls to `unlockShared` + .locked_exclusive => unreachable, // exclusively held lock + .locked_shared => { + rwl.shared_count -= 1; + if (rwl.shared_count == 0) { + rwl.state = .unlocked; + } + }, + } + } +}; + +pub const PthreadRwLock = struct { + rwlock: pthread_rwlock_t, + + pub fn init(rwl: *PthreadRwLock) void { + rwl.* = .{ .rwlock = .{} }; + } + + pub fn deinit(rwl: *PthreadRwLock) void { + const safe_rc = switch (std.builtin.os.tag) { + .dragonfly, .netbsd => std.os.EAGAIN, + else => 0, + }; + + const rc = std.c.pthread_rwlock_destroy(&rwl.rwlock); + assert(rc == 0 or rc == safe_rc); + + rwl.* = undefined; + } + + pub fn tryLock(rwl: *PthreadRwLock) bool { + return pthread_rwlock_trywrlock(&rwl.rwlock) == 0; + } + + pub fn lock(rwl: *PthreadRwLock) void { + const rc = pthread_rwlock_wrlock(&rwl.rwlock); + assert(rc == 0); + } + + pub fn unlock(rwl: *PthreadRwLock) void { + const rc = pthread_rwlock_unlock(&rwl.rwlock); + assert(rc == 0); + } + + pub fn tryLockShared(rwl: *PthreadRwLock) bool { + return pthread_rwlock_tryrdlock(&rwl.rwlock) == 0; + } + + pub fn lockShared(rwl: *PthreadRwLock) void { + const rc = pthread_rwlock_rdlock(&rwl.rwlock); + assert(rc == 0); + } + + pub fn unlockShared(rwl: *PthreadRwLock) void { + const rc = pthread_rwlock_unlock(&rwl.rwlock); + assert(rc == 0); + } +}; + +pub const DefaultRwLock = struct { + state: usize, + mutex: Mutex, + semaphore: Semaphore, + + const IS_WRITING: usize = 1; + const WRITER: usize = 1 << 1; + const READER: usize = 1 << (1 + std.meta.bitCount(Count)); + const WRITER_MASK: usize = std.math.maxInt(Count) << @ctz(usize, WRITER); + const READER_MASK: usize = std.math.maxInt(Count) << @ctz(usize, READER); + const Count = std.meta.Int(.unsigned, @divFloor(std.meta.bitCount(usize) - 1, 2)); + + pub fn init(rwl: *DefaultRwLock) void { + rwl.* = .{ + .state = 0, + .mutex = Mutex.init(), + .semaphore = Semaphore.init(0), + }; + } + + pub fn deinit(rwl: *DefaultRwLock) void { + rwl.semaphore.deinit(); + rwl.mutex.deinit(); + rwl.* = undefined; + } + + pub fn tryLock(rwl: *DefaultRwLock) bool { + if (rwl.mutex.tryLock()) { + const state = @atomicLoad(usize, &rwl.state, .SeqCst); + if (state & READER_MASK == 0) { + _ = @atomicRmw(usize, &rwl.state, .Or, IS_WRITING, .SeqCst); + return true; + } + + rwl.mutex.unlock(); + } + + return false; + } + + pub fn lock(rwl: *DefaultRwLock) void { + _ = @atomicRmw(usize, &rwl.state, .Add, WRITER, .SeqCst); + rwl.mutex.lock(); + + const state = @atomicRmw(usize, &rwl.state, .Or, IS_WRITING, .SeqCst); + if (state & READER_MASK != 0) + rwl.semaphore.wait(); + } + + pub fn unlock(rwl: *DefaultRwLock) void { + _ = @atomicRmw(usize, &rwl.state, .And, ~IS_WRITING, .SeqCst); + rwl.mutex.unlock(); + } + + pub fn tryLockShared(rwl: *DefaultRwLock) bool { + const state = @atomicLoad(usize, &rwl.state, .SeqCst); + if (state & (IS_WRITING | WRITER_MASK) == 0) { + _ = @cmpxchgStrong( + usize, + &rwl.state, + state, + state + READER, + .SeqCst, + .SeqCst, + ) orelse return true; + } + + if (rwl.mutex.tryLock()) { + _ = @atomicRmw(usize, &rwl.state, .Add, READER, .SeqCst); + rwl.mutex.unlock(); + return true; + } + + return false; + } + + pub fn lockShared(rwl: *DefaultRwLock) void { + var state = @atomicLoad(usize, &rwl.state, .SeqCst); + while (state & (IS_WRITING | WRITER_MASK) == 0) { + state = @cmpxchgWeak( + usize, + &rwl.state, + state, + state + READER, + .SeqCst, + .SeqCst, + ) orelse return; + } + + rwl.mutex.lock(); + _ = @atomicRmw(usize, &rwl.state, .Add, READER, .SeqCst); + rwl.mutex.unlock(); + } + + pub fn unlockShared(rwl: *DefaultRwLock) void { + const state = @atomicRmw(usize, &rwl.state, .Sub, READER, .SeqCst); + + if ((state & READER_MASK == READER) and (state & IS_WRITING != 0)) + rwl.semaphore.post(); + } +}; diff --git a/lib/std/Thread/Semaphore.zig b/lib/std/Thread/Semaphore.zig new file mode 100644 index 0000000000..77a278b355 --- /dev/null +++ b/lib/std/Thread/Semaphore.zig @@ -0,0 +1,39 @@ +// SPDX-License-Identifier: MIT +// Copyright (c) 2015-2021 Zig Contributors +// This file is part of [zig](https://ziglang.org/), which is MIT licensed. +// The MIT license requires this copyright notice to be included in all copies +// and substantial portions of the software. + +//! A semaphore is an unsigned integer that blocks the kernel thread if +//! the number would become negative. +//! This API supports static initialization and does not require deinitialization. + +mutex: Mutex = .{}, +cond: Condition = .{}, +//! It is OK to initialize this field to any value. +permits: usize = 0, + +const RwLock = @This(); +const std = @import("../std.zig"); +const Mutex = std.Thread.Mutex; +const Condition = std.Thread.Condition; + +pub fn wait(sem: *Semaphore) void { + const held = sem.mutex.acquire(); + defer held.release(); + + while (sem.permits == 0) + sem.cond.wait(&sem.mutex); + + sem.permits -= 1; + if (sem.permits > 0) + sem.cond.signal(); +} + +pub fn post(sem: *Semaphore) void { + const held = sem.mutex.acquire(); + defer held.release(); + + sem.permits += 1; + sem.cond.signal(); +} diff --git a/lib/std/Thread/StaticResetEvent.zig b/lib/std/Thread/StaticResetEvent.zig new file mode 100644 index 0000000000..414583e477 --- /dev/null +++ b/lib/std/Thread/StaticResetEvent.zig @@ -0,0 +1,396 @@ +// SPDX-License-Identifier: MIT +// Copyright (c) 2015-2021 Zig Contributors +// This file is part of [zig](https://ziglang.org/), which is MIT licensed. +// The MIT license requires this copyright notice to be included in all copies +// and substantial portions of the software. + +//! A thread-safe resource which supports blocking until signaled. +//! This API is for kernel threads, not evented I/O. +//! This API is statically initializable. It cannot fail to be initialized +//! and it requires no deinitialization. The downside is that it may not +//! integrate as cleanly into other synchronization APIs, or, in a worst case, +//! may be forced to fall back on spin locking. As a rule of thumb, prefer +//! to use `std.Thread.ResetEvent` when possible, and use `StaticResetEvent` when +//! the logic needs stronger API guarantees. + +const std = @import("../std.zig"); +const StaticResetEvent = @This(); +const SpinLock = std.SpinLock; +const assert = std.debug.assert; +const os = std.os; +const time = std.time; +const linux = std.os.linux; +const windows = std.os.windows; +const testing = std.testing; + +impl: Impl = .{}, + +pub const Impl = if (std.builtin.single_threaded) + DebugEvent +else + AtomicEvent; + +/// Sets the event if not already set and wakes up all the threads waiting on +/// the event. It is safe to call `set` multiple times before calling `wait`. +/// However it is illegal to call `set` after `wait` is called until the event +/// is `reset`. This function is thread-safe. +pub fn set(ev: *StaticResetEvent) void { + return ev.impl.set(); +} + +/// Wait for the event to be set by blocking the current thread. +/// Thread-safe. No spurious wakeups. +/// Upon return from `wait`, the only function available to be called +/// in `StaticResetEvent` is `reset`. +pub fn wait(ev: *StaticResetEvent) void { + return ev.impl.wait(); +} + +/// Resets the event to its original, unset state. +/// This function is *not* thread-safe. It is equivalent to calling +/// `deinit` followed by `init` but without the possibility of failure. +pub fn reset(ev: *StaticResetEvent) void { + return ev.impl.reset(); +} + +pub const TimedWaitResult = std.Thread.ResetEvent.TimedWaitResult; + +/// Wait for the event to be set by blocking the current thread. +/// A timeout in nanoseconds can be provided as a hint for how +/// long the thread should block on the unset event before returning +/// `TimedWaitResult.timed_out`. +/// Thread-safe. No precision of timing is guaranteed. +/// Upon return from `timedWait`, the only function available to be called +/// in `StaticResetEvent` is `reset`. +pub fn timedWait(ev: *StaticResetEvent, timeout_ns: u64) TimedWaitResult { + return ev.impl.timedWait(timeout_ns); +} + +/// For single-threaded builds, we use this to detect deadlocks. +/// In unsafe modes this ends up being no-ops. +pub const DebugEvent = struct { + state: State = State.unset, + + const State = enum { + unset, + set, + waited, + }; + + /// This function is provided so that this type can be re-used inside + /// `std.Thread.ResetEvent`. + pub fn init(ev: *DebugEvent) void { + ev.* = .{}; + } + + /// This function is provided so that this type can be re-used inside + /// `std.Thread.ResetEvent`. + pub fn deinit(ev: *DebugEvent) void { + ev.* = undefined; + } + + pub fn set(ev: *DebugEvent) void { + switch (ev.state) { + .unset => ev.state = .set, + .set => {}, + .waited => unreachable, // Not allowed to call `set` until `reset`. + } + } + + pub fn wait(ev: *DebugEvent) void { + switch (ev.state) { + .unset => unreachable, // Deadlock detected. + .set => return, + .waited => unreachable, // Not allowed to call `wait` until `reset`. + } + } + + pub fn timedWait(ev: *DebugEvent, timeout: u64) TimedWaitResult { + switch (ev.state) { + .unset => return .timed_out, + .set => return .event_set, + .waited => unreachable, // Not allowed to call `wait` until `reset`. + } + } + + pub fn reset(ev: *DebugEvent) void { + ev.state = .unset; + } +}; + +pub const AtomicEvent = struct { + waiters: u32 = 0, + + const WAKE = 1 << 0; + const WAIT = 1 << 1; + + /// This function is provided so that this type can be re-used inside + /// `std.Thread.ResetEvent`. + pub fn init(ev: *AtomicEvent) void { + ev.* = .{}; + } + + /// This function is provided so that this type can be re-used inside + /// `std.Thread.ResetEvent`. + pub fn deinit(ev: *AtomicEvent) void { + ev.* = undefined; + } + + pub fn set(ev: *AtomicEvent) void { + const waiters = @atomicRmw(u32, &ev.waiters, .Xchg, WAKE, .Release); + if (waiters >= WAIT) { + return Futex.wake(&ev.waiters, waiters >> 1); + } + } + + pub fn wait(ev: *AtomicEvent) void { + switch (ev.timedWait(null)) { + .timed_out => unreachable, + .event_set => return, + } + } + + pub fn timedWait(ev: *AtomicEvent, timeout: ?u64) TimedWaitResult { + var waiters = @atomicLoad(u32, &ev.waiters, .Acquire); + while (waiters != WAKE) { + waiters = @cmpxchgWeak(u32, &ev.waiters, waiters, waiters + WAIT, .Acquire, .Acquire) orelse { + if (Futex.wait(&ev.waiters, timeout)) |_| { + return .event_set; + } else |_| { + return .timed_out; + } + }; + } + return .event_set; + } + + pub fn reset(ev: *AtomicEvent) void { + @atomicStore(u32, &ev.waiters, 0, .Monotonic); + } + + pub const Futex = switch (std.Target.current.os.tag) { + .windows => WindowsFutex, + .linux => LinuxFutex, + else => SpinFutex, + }; + + pub const SpinFutex = struct { + fn wake(waiters: *u32, wake_count: u32) void {} + + fn wait(waiters: *u32, timeout: ?u64) !void { + var timer: time.Timer = undefined; + if (timeout != null) + timer = time.Timer.start() catch return error.TimedOut; + + while (@atomicLoad(u32, waiters, .Acquire) != WAKE) { + SpinLock.yield(); + if (timeout) |timeout_ns| { + if (timer.read() >= timeout_ns) + return error.TimedOut; + } + } + } + }; + + pub const LinuxFutex = struct { + fn wake(waiters: *u32, wake_count: u32) void { + const waiting = std.math.maxInt(i32); // wake_count + const ptr = @ptrCast(*const i32, waiters); + const rc = linux.futex_wake(ptr, linux.FUTEX_WAKE | linux.FUTEX_PRIVATE_FLAG, waiting); + assert(linux.getErrno(rc) == 0); + } + + fn wait(waiters: *u32, timeout: ?u64) !void { + var ts: linux.timespec = undefined; + var ts_ptr: ?*linux.timespec = null; + if (timeout) |timeout_ns| { + ts_ptr = &ts; + ts.tv_sec = @intCast(isize, timeout_ns / time.ns_per_s); + ts.tv_nsec = @intCast(isize, timeout_ns % time.ns_per_s); + } + + while (true) { + const waiting = @atomicLoad(u32, waiters, .Acquire); + if (waiting == WAKE) + return; + const expected = @intCast(i32, waiting); + const ptr = @ptrCast(*const i32, waiters); + const rc = linux.futex_wait(ptr, linux.FUTEX_WAIT | linux.FUTEX_PRIVATE_FLAG, expected, ts_ptr); + switch (linux.getErrno(rc)) { + 0 => continue, + os.ETIMEDOUT => return error.TimedOut, + os.EINTR => continue, + os.EAGAIN => return, + else => unreachable, + } + } + } + }; + + pub const WindowsFutex = struct { + pub fn wake(waiters: *u32, wake_count: u32) void { + const handle = getEventHandle() orelse return SpinFutex.wake(waiters, wake_count); + const key = @ptrCast(*const c_void, waiters); + + var waiting = wake_count; + while (waiting != 0) : (waiting -= 1) { + const rc = windows.ntdll.NtReleaseKeyedEvent(handle, key, windows.FALSE, null); + assert(rc == .SUCCESS); + } + } + + pub fn wait(waiters: *u32, timeout: ?u64) !void { + const handle = getEventHandle() orelse return SpinFutex.wait(waiters, timeout); + const key = @ptrCast(*const c_void, waiters); + + // NT uses timeouts in units of 100ns with negative value being relative + var timeout_ptr: ?*windows.LARGE_INTEGER = null; + var timeout_value: windows.LARGE_INTEGER = undefined; + if (timeout) |timeout_ns| { + timeout_ptr = &timeout_value; + timeout_value = -@intCast(windows.LARGE_INTEGER, timeout_ns / 100); + } + + // NtWaitForKeyedEvent doesnt have spurious wake-ups + var rc = windows.ntdll.NtWaitForKeyedEvent(handle, key, windows.FALSE, timeout_ptr); + switch (rc) { + .TIMEOUT => { + // update the wait count to signal that we're not waiting anymore. + // if the .set() thread already observed that we are, perform a + // matching NtWaitForKeyedEvent so that the .set() thread doesn't + // deadlock trying to run NtReleaseKeyedEvent above. + var waiting = @atomicLoad(u32, waiters, .Monotonic); + while (true) { + if (waiting == WAKE) { + rc = windows.ntdll.NtWaitForKeyedEvent(handle, key, windows.FALSE, null); + assert(rc == .WAIT_0); + break; + } else { + waiting = @cmpxchgWeak(u32, waiters, waiting, waiting - WAIT, .Acquire, .Monotonic) orelse break; + continue; + } + } + return error.TimedOut; + }, + .WAIT_0 => {}, + else => unreachable, + } + } + + var event_handle: usize = EMPTY; + const EMPTY = ~@as(usize, 0); + const LOADING = EMPTY - 1; + + pub fn getEventHandle() ?windows.HANDLE { + var handle = @atomicLoad(usize, &event_handle, .Monotonic); + while (true) { + switch (handle) { + EMPTY => handle = @cmpxchgWeak(usize, &event_handle, EMPTY, LOADING, .Acquire, .Monotonic) orelse { + const handle_ptr = @ptrCast(*windows.HANDLE, &handle); + const access_mask = windows.GENERIC_READ | windows.GENERIC_WRITE; + if (windows.ntdll.NtCreateKeyedEvent(handle_ptr, access_mask, null, 0) != .SUCCESS) + handle = 0; + @atomicStore(usize, &event_handle, handle, .Monotonic); + return @intToPtr(?windows.HANDLE, handle); + }, + LOADING => { + SpinLock.yield(); + handle = @atomicLoad(usize, &event_handle, .Monotonic); + }, + else => { + return @intToPtr(?windows.HANDLE, handle); + }, + } + } + } + }; +}; + +test "basic usage" { + var event = StaticResetEvent{}; + + // test event setting + event.set(); + + // test event resetting + event.reset(); + + // test event waiting (non-blocking) + event.set(); + event.wait(); + event.reset(); + + event.set(); + testing.expectEqual(TimedWaitResult.event_set, event.timedWait(1)); + + // test cross-thread signaling + if (std.builtin.single_threaded) + return; + + const Context = struct { + const Self = @This(); + + value: u128 = 0, + in: StaticResetEvent = .{}, + out: StaticResetEvent = .{}, + + fn sender(self: *Self) void { + // update value and signal input + testing.expect(self.value == 0); + self.value = 1; + self.in.set(); + + // wait for receiver to update value and signal output + self.out.wait(); + testing.expect(self.value == 2); + + // update value and signal final input + self.value = 3; + self.in.set(); + } + + fn receiver(self: *Self) void { + // wait for sender to update value and signal input + self.in.wait(); + assert(self.value == 1); + + // update value and signal output + self.in.reset(); + self.value = 2; + self.out.set(); + + // wait for sender to update value and signal final input + self.in.wait(); + assert(self.value == 3); + } + + fn sleeper(self: *Self) void { + self.in.set(); + time.sleep(time.ns_per_ms * 2); + self.value = 5; + self.out.set(); + } + + fn timedWaiter(self: *Self) !void { + self.in.wait(); + testing.expectEqual(TimedWaitResult.timed_out, self.out.timedWait(time.ns_per_us)); + try self.out.timedWait(time.ns_per_ms * 100); + testing.expect(self.value == 5); + } + }; + + var context = Context{}; + const receiver = try std.Thread.spawn(&context, Context.receiver); + defer receiver.wait(); + context.sender(); + + if (false) { + // I have now observed this fail on macOS, Windows, and Linux. + // https://github.com/ziglang/zig/issues/7009 + var timed = Context.init(); + defer timed.deinit(); + const sleeper = try std.Thread.spawn(&timed, Context.sleeper); + defer sleeper.wait(); + try timed.timedWaiter(); + } +} diff --git a/lib/std/atomic/queue.zig b/lib/std/atomic/queue.zig index fa3711cd9f..f5f63944ab 100644 --- a/lib/std/atomic/queue.zig +++ b/lib/std/atomic/queue.zig @@ -16,7 +16,7 @@ pub fn Queue(comptime T: type) type { return struct { head: ?*Node, tail: ?*Node, - mutex: std.Mutex, + mutex: std.Thread.Mutex, pub const Self = @This(); pub const Node = std.TailQueue(T).Node; @@ -27,7 +27,7 @@ pub fn Queue(comptime T: type) type { return Self{ .head = null, .tail = null, - .mutex = std.Mutex{}, + .mutex = std.Thread.Mutex{}, }; } diff --git a/lib/std/auto_reset_event.zig b/lib/std/auto_reset_event.zig deleted file mode 100644 index 39cd184a68..0000000000 --- a/lib/std/auto_reset_event.zig +++ /dev/null @@ -1,226 +0,0 @@ -// SPDX-License-Identifier: MIT -// Copyright (c) 2015-2021 Zig Contributors -// This file is part of [zig](https://ziglang.org/), which is MIT licensed. -// The MIT license requires this copyright notice to be included in all copies -// and substantial portions of the software. -const std = @import("std.zig"); -const builtin = @import("builtin"); -const testing = std.testing; -const assert = std.debug.assert; -const StaticResetEvent = std.StaticResetEvent; - -/// Similar to `StaticResetEvent` but on `set()` it also (atomically) does `reset()`. -/// Unlike StaticResetEvent, `wait()` can only be called by one thread (MPSC-like). -pub const AutoResetEvent = struct { - /// AutoResetEvent has 3 possible states: - /// - UNSET: the AutoResetEvent is currently unset - /// - SET: the AutoResetEvent was notified before a wait() was called - /// - : there is an active waiter waiting for a notification. - /// - /// When attempting to wait: - /// if the event is unset, it registers a ResetEvent pointer to be notified when the event is set - /// if the event is already set, then it consumes the notification and resets the event. - /// - /// When attempting to notify: - /// if the event is unset, then we set the event - /// if theres a waiting ResetEvent, then we unset the event and notify the ResetEvent - /// - /// This ensures that the event is automatically reset after a wait() has been issued - /// and avoids the race condition when using StaticResetEvent in the following scenario: - /// thread 1 | thread 2 - /// StaticResetEvent.wait() | - /// | StaticResetEvent.set() - /// | StaticResetEvent.set() - /// StaticResetEvent.reset() | - /// StaticResetEvent.wait() | (missed the second .set() notification above) - state: usize = UNSET, - - const UNSET = 0; - const SET = 1; - - /// the minimum alignment for the `*StaticResetEvent` created by wait*() - const event_align = std.math.max(@alignOf(StaticResetEvent), 2); - - pub fn wait(self: *AutoResetEvent) void { - self.waitFor(null) catch unreachable; - } - - pub fn timedWait(self: *AutoResetEvent, timeout: u64) error{TimedOut}!void { - return self.waitFor(timeout); - } - - fn waitFor(self: *AutoResetEvent, timeout: ?u64) error{TimedOut}!void { - // lazily initialized StaticResetEvent - var reset_event: StaticResetEvent align(event_align) = undefined; - var has_reset_event = false; - - var state = @atomicLoad(usize, &self.state, .SeqCst); - while (true) { - // consume a notification if there is any - if (state == SET) { - @atomicStore(usize, &self.state, UNSET, .SeqCst); - return; - } - - // check if theres currently a pending ResetEvent pointer already registered - if (state != UNSET) { - unreachable; // multiple waiting threads on the same AutoResetEvent - } - - // lazily initialize the ResetEvent if it hasn't been already - if (!has_reset_event) { - has_reset_event = true; - reset_event = .{}; - } - - // Since the AutoResetEvent currently isnt set, - // try to register our ResetEvent on it to wait - // for a set() call from another thread. - if (@cmpxchgWeak( - usize, - &self.state, - UNSET, - @ptrToInt(&reset_event), - .SeqCst, - .SeqCst, - )) |new_state| { - state = new_state; - continue; - } - - // if no timeout was specified, then just wait forever - const timeout_ns = timeout orelse { - reset_event.wait(); - return; - }; - - // wait with a timeout and return if signalled via set() - switch (reset_event.timedWait(timeout_ns)) { - .event_set => return, - .timed_out => {}, - } - - // If we timed out, we need to transition the AutoResetEvent back to UNSET. - // If we don't, then when we return, a set() thread could observe a pointer to an invalid ResetEvent. - state = @cmpxchgStrong( - usize, - &self.state, - @ptrToInt(&reset_event), - UNSET, - .SeqCst, - .SeqCst, - ) orelse return error.TimedOut; - - // We didn't manage to unregister ourselves from the state. - if (state == SET) { - unreachable; // AutoResetEvent notified without waking up the waiting thread - } else if (state != UNSET) { - unreachable; // multiple waiting threads on the same AutoResetEvent observed when timing out - } - - // This menas a set() thread saw our ResetEvent pointer, acquired it, and is trying to wake it up. - // We need to wait for it to wake up our ResetEvent before we can return and invalidate it. - // We don't return error.TimedOut here as it technically notified us while we were "timing out". - reset_event.wait(); - return; - } - } - - pub fn set(self: *AutoResetEvent) void { - var state = @atomicLoad(usize, &self.state, .SeqCst); - while (true) { - // If the AutoResetEvent is already set, there is nothing else left to do - if (state == SET) { - return; - } - - // If the AutoResetEvent isn't set, - // then try to leave a notification for the wait() thread that we set() it. - if (state == UNSET) { - state = @cmpxchgWeak( - usize, - &self.state, - UNSET, - SET, - .SeqCst, - .SeqCst, - ) orelse return; - continue; - } - - // There is a ResetEvent pointer registered on the AutoResetEvent event thats waiting. - // Try to acquire ownership of it so that we can wake it up. - // This also resets the AutoResetEvent so that there is no race condition as defined above. - if (@cmpxchgWeak( - usize, - &self.state, - state, - UNSET, - .SeqCst, - .SeqCst, - )) |new_state| { - state = new_state; - continue; - } - - const reset_event = @intToPtr(*align(event_align) StaticResetEvent, state); - reset_event.set(); - return; - } - } -}; - -test "std.AutoResetEvent" { - // test local code paths - { - var event = AutoResetEvent{}; - testing.expectError(error.TimedOut, event.timedWait(1)); - event.set(); - event.wait(); - } - - // test cross-thread signaling - if (builtin.single_threaded) - return; - - const Context = struct { - value: u128 = 0, - in: AutoResetEvent = AutoResetEvent{}, - out: AutoResetEvent = AutoResetEvent{}, - - const Self = @This(); - - fn sender(self: *Self) void { - testing.expect(self.value == 0); - self.value = 1; - self.out.set(); - - self.in.wait(); - testing.expect(self.value == 2); - self.value = 3; - self.out.set(); - - self.in.wait(); - testing.expect(self.value == 4); - } - - fn receiver(self: *Self) void { - self.out.wait(); - testing.expect(self.value == 1); - self.value = 2; - self.in.set(); - - self.out.wait(); - testing.expect(self.value == 3); - self.value = 4; - self.in.set(); - } - }; - - var context = Context{}; - const send_thread = try std.Thread.spawn(&context, Context.sender); - const recv_thread = try std.Thread.spawn(&context, Context.receiver); - - send_thread.wait(); - recv_thread.wait(); -} diff --git a/lib/std/c.zig b/lib/std/c.zig index b7a412339e..1e86bfbd8c 100644 --- a/lib/std/c.zig +++ b/lib/std/c.zig @@ -338,6 +338,13 @@ pub extern "c" fn pthread_cond_signal(cond: *pthread_cond_t) c_int; pub extern "c" fn pthread_cond_broadcast(cond: *pthread_cond_t) c_int; pub extern "c" fn pthread_cond_destroy(cond: *pthread_cond_t) c_int; +pub extern "c" fn pthread_rwlock_destroy(rwl: *pthread_rwlock_t) callconv(.C) c_int; +pub extern "c" fn pthread_rwlock_rdlock(rwl: *pthread_rwlock_t) callconv(.C) c_int; +pub extern "c" fn pthread_rwlock_wrlock(rwl: *pthread_rwlock_t) callconv(.C) c_int; +pub extern "c" fn pthread_rwlock_tryrdlock(rwl: *pthread_rwlock_t) callconv(.C) c_int; +pub extern "c" fn pthread_rwlock_trywrlock(rwl: *pthread_rwlock_t) callconv(.C) c_int; +pub extern "c" fn pthread_rwlock_unlock(rwl: *pthread_rwlock_t) callconv(.C) c_int; + pub const pthread_t = *opaque {}; pub const FILE = opaque {}; diff --git a/lib/std/c/darwin.zig b/lib/std/c/darwin.zig index 7527752527..b5c3fbf977 100644 --- a/lib/std/c/darwin.zig +++ b/lib/std/c/darwin.zig @@ -177,6 +177,10 @@ pub const pthread_cond_t = extern struct { __sig: c_long = 0x3CB0B1BB, __opaque: [__PTHREAD_COND_SIZE__]u8 = [_]u8{0} ** __PTHREAD_COND_SIZE__, }; +pub const pthread_rwlock_t = extern struct { + __sig: c_long = 0x2DA8B3B4, + __opaque: [192]u8 = [_]u8{0} ** 192, +}; pub const sem_t = c_int; const __PTHREAD_MUTEX_SIZE__ = if (@sizeOf(usize) == 8) 56 else 40; const __PTHREAD_COND_SIZE__ = if (@sizeOf(usize) == 8) 40 else 24; @@ -192,7 +196,7 @@ pub extern "c" fn pthread_threadid_np(thread: ?pthread_t, thread_id: *u64) c_int pub extern "c" fn arc4random_buf(buf: [*]u8, len: usize) void; // Grand Central Dispatch is exposed by libSystem. -pub const dispatch_semaphore_t = *opaque{}; +pub const dispatch_semaphore_t = *opaque {}; pub const dispatch_time_t = u64; pub const DISPATCH_TIME_NOW = @as(dispatch_time_t, 0); pub const DISPATCH_TIME_FOREVER = ~@as(dispatch_time_t, 0); diff --git a/lib/std/c/emscripten.zig b/lib/std/c/emscripten.zig index 1652975eb9..526eb9e99c 100644 --- a/lib/std/c/emscripten.zig +++ b/lib/std/c/emscripten.zig @@ -9,5 +9,8 @@ pub const pthread_mutex_t = extern struct { pub const pthread_cond_t = extern struct { size: [__SIZEOF_PTHREAD_COND_T]u8 align(@alignOf(usize)) = [_]u8{0} ** __SIZEOF_PTHREAD_COND_T, }; +pub const pthread_rwlock_t = extern struct { + size: [32]u8 align(4) = [_]u8{0} ** 32, +}; const __SIZEOF_PTHREAD_COND_T = 48; const __SIZEOF_PTHREAD_MUTEX_T = 28; diff --git a/lib/std/c/freebsd.zig b/lib/std/c/freebsd.zig index a8d11b95b2..a6c84c66fa 100644 --- a/lib/std/c/freebsd.zig +++ b/lib/std/c/freebsd.zig @@ -43,6 +43,9 @@ pub const pthread_mutex_t = extern struct { pub const pthread_cond_t = extern struct { inner: ?*c_void = null, }; +pub const pthread_rwlock_t = extern struct { + ptr: ?*c_void = null, +}; pub const pthread_attr_t = extern struct { __size: [56]u8, diff --git a/lib/std/c/fuchsia.zig b/lib/std/c/fuchsia.zig index bc53dc81a6..fc34f49d22 100644 --- a/lib/std/c/fuchsia.zig +++ b/lib/std/c/fuchsia.zig @@ -9,5 +9,8 @@ pub const pthread_mutex_t = extern struct { pub const pthread_cond_t = extern struct { size: [__SIZEOF_PTHREAD_COND_T]u8 align(@alignOf(usize)) = [_]u8{0} ** __SIZEOF_PTHREAD_COND_T, }; +pub const pthread_rwlock_t = extern struct { + size: [56]u8 align(@alignOf(usize)) = [_]u8{0} ** 56, +}; const __SIZEOF_PTHREAD_COND_T = 48; const __SIZEOF_PTHREAD_MUTEX_T = 40; diff --git a/lib/std/c/haiku.zig b/lib/std/c/haiku.zig index 438012c3b3..0f695a9446 100644 --- a/lib/std/c/haiku.zig +++ b/lib/std/c/haiku.zig @@ -17,3 +17,12 @@ pub const pthread_cond_t = extern struct { waiter_count: i32 = 0, lock: i32 = 0, }; +pub const pthread_rwlock_t = extern struct { + flags: u32 = 0, + owner: i32 = -1, + lock_sem: i32 = 0, + lock_count: i32 = 0, + reader_count: i32 = 0, + writer_count: i32 = 0, + waiters: [2]?*c_void = [_]?*c_void{ null, null }, +}; diff --git a/lib/std/c/hermit.zig b/lib/std/c/hermit.zig index fa351bc0db..a159395ab3 100644 --- a/lib/std/c/hermit.zig +++ b/lib/std/c/hermit.zig @@ -9,3 +9,6 @@ pub const pthread_mutex_t = extern struct { pub const pthread_cond_t = extern struct { inner: usize = ~@as(usize, 0), }; +pub const pthread_rwlock_t = extern struct { + ptr: usize = std.math.maxInt(usize), +}; diff --git a/lib/std/c/linux.zig b/lib/std/c/linux.zig index db464d7a6d..fbfabdd568 100644 --- a/lib/std/c/linux.zig +++ b/lib/std/c/linux.zig @@ -123,6 +123,32 @@ pub const pthread_mutex_t = extern struct { pub const pthread_cond_t = extern struct { size: [__SIZEOF_PTHREAD_COND_T]u8 align(@alignOf(usize)) = [_]u8{0} ** __SIZEOF_PTHREAD_COND_T, }; +pub const pthread_rwlock_t = switch (std.builtin.abi) { + .android => switch (@sizeOf(usize)) { + 4 => extern struct { + lock: std.c.pthread_mutex_t = std.c.PTHREAD_MUTEX_INITIALIZER, + cond: std.c.pthread_cond_t = std.c.PTHREAD_COND_INITIALIZER, + numLocks: c_int = 0, + writerThreadId: c_int = 0, + pendingReaders: c_int = 0, + pendingWriters: c_int = 0, + attr: i32 = 0, + __reserved: [12]u8 = [_]u8{0} ** 2, + }, + 8 => extern struct { + numLocks: c_int = 0, + writerThreadId: c_int = 0, + pendingReaders: c_int = 0, + pendingWriters: c_int = 0, + attr: i32 = 0, + __reserved: [36]u8 = [_]u8{0} ** 36, + }, + else => unreachable, + }, + else => extern struct { + size: [56]u8 align(@alignOf(usize)) = [_]u8{0} ** 56, + }, +}; pub const sem_t = extern struct { __size: [__SIZEOF_SEM_T]u8 align(@alignOf(usize)), }; diff --git a/lib/std/c/netbsd.zig b/lib/std/c/netbsd.zig index 258ee6a5c6..46a38706f4 100644 --- a/lib/std/c/netbsd.zig +++ b/lib/std/c/netbsd.zig @@ -54,6 +54,22 @@ pub const pthread_cond_t = extern struct { ptc_private: ?*c_void = null, }; +pub const pthread_rwlock_t = extern struct { + ptr_magic: c_uint = 0x99990009, + ptr_interlock: switch (std.builtin.arch) { + .aarch64, .sparc, .x86_64, .i386 => u8, + .arm, .powerpc => c_int, + else => unreachable, + } = 0, + ptr_rblocked_first: ?*u8 = null, + ptr_rblocked_last: ?*u8 = null, + ptr_wblocked_first: ?*u8 = null, + ptr_wblocked_last: ?*u8 = null, + ptr_nreaders: c_uint = 0, + ptr_owner: std.c.pthread_t = null, + ptr_private: ?*c_void = null, +}; + const pthread_spin_t = switch (builtin.arch) { .aarch64, .aarch64_be, .aarch64_32 => u8, .mips, .mipsel, .mips64, .mips64el => u32, diff --git a/lib/std/c/openbsd.zig b/lib/std/c/openbsd.zig index dd89c837ff..99debf57e7 100644 --- a/lib/std/c/openbsd.zig +++ b/lib/std/c/openbsd.zig @@ -27,6 +27,9 @@ pub const pthread_mutex_t = extern struct { pub const pthread_cond_t = extern struct { inner: ?*c_void = null, }; +pub const pthread_rwlock_t = extern struct { + ptr: ?*c_void = null, +}; pub const pthread_spinlock_t = extern struct { inner: ?*c_void = null, }; diff --git a/lib/std/debug.zig b/lib/std/debug.zig index 15d3baa1d0..e4ef25724b 100644 --- a/lib/std/debug.zig +++ b/lib/std/debug.zig @@ -50,7 +50,7 @@ pub const LineInfo = struct { } }; -var stderr_mutex = std.Mutex{}; +var stderr_mutex = std.Thread.Mutex{}; /// Deprecated. Use `std.log` functions for logging or `std.debug.print` for /// "printf debugging". @@ -65,7 +65,7 @@ pub fn print(comptime fmt: []const u8, args: anytype) void { nosuspend stderr.print(fmt, args) catch return; } -pub fn getStderrMutex() *std.Mutex { +pub fn getStderrMutex() *std.Thread.Mutex { return &stderr_mutex; } @@ -235,7 +235,7 @@ pub fn panic(comptime format: []const u8, args: anytype) noreturn { var panicking: u8 = 0; // Locked to avoid interleaving panic messages from multiple threads. -var panic_mutex = std.Mutex{}; +var panic_mutex = std.Thread.Mutex{}; /// Counts how many times the panic handler is invoked by this thread. /// This is used to catch and handle panics triggered by the panic handler. @@ -280,7 +280,7 @@ pub fn panicExtra(trace: ?*const builtin.StackTrace, first_trace_addr: ?usize, c // and call abort() // Sleep forever without hammering the CPU - var event: std.StaticResetEvent = .{}; + var event: std.Thread.StaticResetEvent = .{}; event.wait(); unreachable; } diff --git a/lib/std/event/lock.zig b/lib/std/event/lock.zig index 17d79c753b..d48c6c1520 100644 --- a/lib/std/event/lock.zig +++ b/lib/std/event/lock.zig @@ -16,7 +16,7 @@ const Loop = std.event.Loop; /// Allows only one actor to hold the lock. /// TODO: make this API also work in blocking I/O mode. pub const Lock = struct { - mutex: std.Mutex = std.Mutex{}, + mutex: std.Thread.Mutex = std.Thread.Mutex{}, head: usize = UNLOCKED, const UNLOCKED = 0; diff --git a/lib/std/event/loop.zig b/lib/std/event/loop.zig index 8101d27a55..492b7c1758 100644 --- a/lib/std/event/loop.zig +++ b/lib/std/event/loop.zig @@ -29,7 +29,7 @@ pub const Loop = struct { fs_thread: *Thread, fs_queue: std.atomic.Queue(Request), fs_end_request: Request.Node, - fs_thread_wakeup: std.ResetEvent, + fs_thread_wakeup: std.Thread.ResetEvent, /// For resources that have the same lifetime as the `Loop`. /// This is only used by `Loop` for the thread pool and associated resources. @@ -785,7 +785,7 @@ pub const Loop = struct { timer: std.time.Timer, waiters: Waiters, thread: *std.Thread, - event: std.AutoResetEvent, + event: std.Thread.AutoResetEvent, is_running: bool, /// Initialize the delay queue by spawning the timer thread @@ -796,7 +796,7 @@ pub const Loop = struct { .waiters = DelayQueue.Waiters{ .entries = std.atomic.Queue(anyframe).init(), }, - .event = std.AutoResetEvent{}, + .event = std.Thread.AutoResetEvent{}, .is_running = true, // Must be last so that it can read the other state, such as `is_running`. .thread = try std.Thread.spawn(self, DelayQueue.run), diff --git a/lib/std/event/wait_group.zig b/lib/std/event/wait_group.zig index d123f7df27..0b83c18c74 100644 --- a/lib/std/event/wait_group.zig +++ b/lib/std/event/wait_group.zig @@ -30,7 +30,7 @@ pub fn WaitGroupGeneric(comptime counter_size: u16) type { return struct { counter: CounterType = 0, max_counter: CounterType = std.math.maxInt(CounterType), - mutex: std.Mutex = .{}, + mutex: std.Thread.Mutex = .{}, waiters: ?*Waiter = null, const Waiter = struct { next: ?*Waiter, diff --git a/lib/std/fs/test.zig b/lib/std/fs/test.zig index 9afcc50f3c..e9f28a0b60 100644 --- a/lib/std/fs/test.zig +++ b/lib/std/fs/test.zig @@ -750,7 +750,7 @@ test "open file with exclusive lock twice, make sure it waits" { errdefer file.close(); const S = struct { - const C = struct { dir: *fs.Dir, evt: *std.ResetEvent }; + const C = struct { dir: *fs.Dir, evt: *std.Thread.ResetEvent }; fn checkFn(ctx: C) !void { const file1 = try ctx.dir.createFile(filename, .{ .lock = .Exclusive }); defer file1.close(); @@ -758,7 +758,7 @@ test "open file with exclusive lock twice, make sure it waits" { } }; - var evt: std.ResetEvent = undefined; + var evt: std.Thread.ResetEvent = undefined; try evt.init(); defer evt.deinit(); diff --git a/lib/std/heap/general_purpose_allocator.zig b/lib/std/heap/general_purpose_allocator.zig index 415b206b6a..05f05c1da3 100644 --- a/lib/std/heap/general_purpose_allocator.zig +++ b/lib/std/heap/general_purpose_allocator.zig @@ -149,12 +149,12 @@ pub const Config = struct { thread_safe: bool = !std.builtin.single_threaded, /// What type of mutex you'd like to use, for thread safety. - /// when specfied, the mutex type must have the same shape as `std.Mutex` and + /// when specfied, the mutex type must have the same shape as `std.Thread.Mutex` and /// `std.mutex.Dummy`, and have no required fields. Specifying this field causes /// the `thread_safe` field to be ignored. /// /// when null (default): - /// * the mutex type defaults to `std.Mutex` when thread_safe is enabled. + /// * the mutex type defaults to `std.Thread.Mutex` when thread_safe is enabled. /// * the mutex type defaults to `std.mutex.Dummy` otherwise. MutexType: ?type = null, @@ -187,7 +187,7 @@ pub fn GeneralPurposeAllocator(comptime config: Config) type { const mutex_init = if (config.MutexType) |T| T{} else if (config.thread_safe) - std.Mutex{} + std.Thread.Mutex{} else std.mutex.Dummy{}; @@ -869,9 +869,9 @@ test "realloc large object to small object" { } test "overrideable mutexes" { - var gpa = GeneralPurposeAllocator(.{ .MutexType = std.Mutex }){ + var gpa = GeneralPurposeAllocator(.{ .MutexType = std.Thread.Mutex }){ .backing_allocator = std.testing.allocator, - .mutex = std.Mutex{}, + .mutex = std.Thread.Mutex{}, }; defer std.testing.expect(!gpa.deinit()); const allocator = &gpa.allocator; diff --git a/lib/std/mutex.zig b/lib/std/mutex.zig deleted file mode 100644 index 50bbb40bf0..0000000000 --- a/lib/std/mutex.zig +++ /dev/null @@ -1,379 +0,0 @@ -// SPDX-License-Identifier: MIT -// Copyright (c) 2015-2021 Zig Contributors -// This file is part of [zig](https://ziglang.org/), which is MIT licensed. -// The MIT license requires this copyright notice to be included in all copies -// and substantial portions of the software. -const std = @import("std.zig"); -const builtin = @import("builtin"); -const os = std.os; -const assert = std.debug.assert; -const windows = os.windows; -const testing = std.testing; -const SpinLock = std.SpinLock; -const StaticResetEvent = std.StaticResetEvent; - -/// Lock may be held only once. If the same thread tries to acquire -/// the same mutex twice, it deadlocks. This type supports static -/// initialization and is at most `@sizeOf(usize)` in size. When an -/// application is built in single threaded release mode, all the -/// functions are no-ops. In single threaded debug mode, there is -/// deadlock detection. -/// -/// Example usage: -/// var m = Mutex{}; -/// -/// const lock = m.acquire(); -/// defer lock.release(); -/// ... critical code -/// -/// Non-blocking: -/// if (m.tryAcquire) |lock| { -/// defer lock.release(); -/// // ... critical section -/// } else { -/// // ... lock not acquired -/// } -pub const Mutex = if (builtin.single_threaded) - Dummy -else if (builtin.os.tag == .windows) - WindowsMutex -else if (std.Thread.use_pthreads) - PthreadMutex -else if (builtin.link_libc or builtin.os.tag == .linux) - // stack-based version of https://github.com/Amanieu/parking_lot/blob/master/core/src/word_lock.rs - struct { - state: usize = 0, - - /// number of times to spin trying to acquire the lock. - /// https://webkit.org/blog/6161/locking-in-webkit/ - const SPIN_COUNT = 40; - - const MUTEX_LOCK: usize = 1 << 0; - const QUEUE_LOCK: usize = 1 << 1; - const QUEUE_MASK: usize = ~(MUTEX_LOCK | QUEUE_LOCK); - - const Node = struct { - next: ?*Node, - event: StaticResetEvent, - }; - - pub fn tryAcquire(self: *Mutex) ?Held { - if (@cmpxchgWeak(usize, &self.state, 0, MUTEX_LOCK, .Acquire, .Monotonic) != null) - return null; - return Held{ .mutex = self }; - } - - pub fn acquire(self: *Mutex) Held { - return self.tryAcquire() orelse { - self.acquireSlow(); - return Held{ .mutex = self }; - }; - } - - fn acquireSlow(self: *Mutex) void { - // inlining the fast path and hiding *Slow() - // calls behind a @setCold(true) appears to - // improve performance in release builds. - @setCold(true); - while (true) { - - // try and spin for a bit to acquire the mutex if theres currently no queue - var spin_count: u32 = SPIN_COUNT; - var state = @atomicLoad(usize, &self.state, .Monotonic); - while (spin_count != 0) : (spin_count -= 1) { - if (state & MUTEX_LOCK == 0) { - _ = @cmpxchgWeak(usize, &self.state, state, state | MUTEX_LOCK, .Acquire, .Monotonic) orelse return; - } else if (state & QUEUE_MASK == 0) { - break; - } - SpinLock.yield(); - state = @atomicLoad(usize, &self.state, .Monotonic); - } - - // create the StaticResetEvent node on the stack - // (faster than threadlocal on platforms like OSX) - var node: Node = .{ - .next = undefined, - .event = .{}, - }; - - // we've spun too long, try and add our node to the LIFO queue. - // if the mutex becomes available in the process, try and grab it instead. - while (true) { - if (state & MUTEX_LOCK == 0) { - _ = @cmpxchgWeak(usize, &self.state, state, state | MUTEX_LOCK, .Acquire, .Monotonic) orelse return; - } else { - node.next = @intToPtr(?*Node, state & QUEUE_MASK); - const new_state = @ptrToInt(&node) | (state & ~QUEUE_MASK); - _ = @cmpxchgWeak(usize, &self.state, state, new_state, .Release, .Monotonic) orelse { - node.event.wait(); - break; - }; - } - SpinLock.yield(); - state = @atomicLoad(usize, &self.state, .Monotonic); - } - } - } - - /// Returned when the lock is acquired. Call release to - /// release. - pub const Held = struct { - mutex: *Mutex, - - /// Release the held lock. - pub fn release(self: Held) void { - // first, remove the lock bit so another possibly parallel acquire() can succeed. - // use .Sub since it can be usually compiled down more efficiency - // (`lock sub` on x86) vs .And ~MUTEX_LOCK (`lock cmpxchg` loop on x86) - const state = @atomicRmw(usize, &self.mutex.state, .Sub, MUTEX_LOCK, .Release); - - // if the LIFO queue isnt locked and it has a node, try and wake up the node. - if ((state & QUEUE_LOCK) == 0 and (state & QUEUE_MASK) != 0) - self.mutex.releaseSlow(); - } - }; - - fn releaseSlow(self: *Mutex) void { - @setCold(true); - - // try and lock the LFIO queue to pop a node off, - // stopping altogether if its already locked or the queue is empty - var state = @atomicLoad(usize, &self.state, .Monotonic); - while (true) : (SpinLock.loopHint(1)) { - if (state & QUEUE_LOCK != 0 or state & QUEUE_MASK == 0) - return; - state = @cmpxchgWeak(usize, &self.state, state, state | QUEUE_LOCK, .Acquire, .Monotonic) orelse break; - } - - // acquired the QUEUE_LOCK, try and pop a node to wake it. - // if the mutex is locked, then unset QUEUE_LOCK and let - // the thread who holds the mutex do the wake-up on unlock() - while (true) : (SpinLock.loopHint(1)) { - if ((state & MUTEX_LOCK) != 0) { - state = @cmpxchgWeak(usize, &self.state, state, state & ~QUEUE_LOCK, .Release, .Acquire) orelse return; - } else { - const node = @intToPtr(*Node, state & QUEUE_MASK); - const new_state = @ptrToInt(node.next); - state = @cmpxchgWeak(usize, &self.state, state, new_state, .Release, .Acquire) orelse { - node.event.set(); - return; - }; - } - } - } - } - - // for platforms without a known OS blocking - // primitive, default to SpinLock for correctness -else - SpinLock; - -pub const PthreadMutex = struct { - pthread_mutex: std.c.pthread_mutex_t = init, - - pub const Held = struct { - mutex: *PthreadMutex, - - pub fn release(self: Held) void { - switch (std.c.pthread_mutex_unlock(&self.mutex.pthread_mutex)) { - 0 => return, - std.c.EINVAL => unreachable, - std.c.EAGAIN => unreachable, - std.c.EPERM => unreachable, - else => unreachable, - } - } - }; - - /// Create a new mutex in unlocked state. - pub const init = std.c.PTHREAD_MUTEX_INITIALIZER; - - /// Try to acquire the mutex without blocking. Returns null if - /// the mutex is unavailable. Otherwise returns Held. Call - /// release on Held. - pub fn tryAcquire(self: *PthreadMutex) ?Held { - if (std.c.pthread_mutex_trylock(&self.pthread_mutex) == 0) { - return Held{ .mutex = self }; - } else { - return null; - } - } - - /// Acquire the mutex. Will deadlock if the mutex is already - /// held by the calling thread. - pub fn acquire(self: *PthreadMutex) Held { - switch (std.c.pthread_mutex_lock(&self.pthread_mutex)) { - 0 => return Held{ .mutex = self }, - std.c.EINVAL => unreachable, - std.c.EBUSY => unreachable, - std.c.EAGAIN => unreachable, - std.c.EDEADLK => unreachable, - std.c.EPERM => unreachable, - else => unreachable, - } - } -}; - -/// This has the sematics as `Mutex`, however it does not actually do any -/// synchronization. Operations are safety-checked no-ops. -pub const Dummy = struct { - lock: @TypeOf(lock_init) = lock_init, - - const lock_init = if (std.debug.runtime_safety) false else {}; - - pub const Held = struct { - mutex: *Dummy, - - pub fn release(self: Held) void { - if (std.debug.runtime_safety) { - self.mutex.lock = false; - } - } - }; - - /// Create a new mutex in unlocked state. - pub const init = Dummy{}; - - /// Try to acquire the mutex without blocking. Returns null if - /// the mutex is unavailable. Otherwise returns Held. Call - /// release on Held. - pub fn tryAcquire(self: *Dummy) ?Held { - if (std.debug.runtime_safety) { - if (self.lock) return null; - self.lock = true; - } - return Held{ .mutex = self }; - } - - /// Acquire the mutex. Will deadlock if the mutex is already - /// held by the calling thread. - pub fn acquire(self: *Dummy) Held { - return self.tryAcquire() orelse @panic("deadlock detected"); - } -}; - -// https://locklessinc.com/articles/keyed_events/ -const WindowsMutex = struct { - state: State = State{ .waiters = 0 }, - - const State = extern union { - locked: u8, - waiters: u32, - }; - - const WAKE = 1 << 8; - const WAIT = 1 << 9; - - pub fn tryAcquire(self: *WindowsMutex) ?Held { - if (@atomicRmw(u8, &self.state.locked, .Xchg, 1, .Acquire) != 0) - return null; - return Held{ .mutex = self }; - } - - pub fn acquire(self: *WindowsMutex) Held { - return self.tryAcquire() orelse self.acquireSlow(); - } - - fn acquireSpinning(self: *WindowsMutex) Held { - @setCold(true); - while (true) : (SpinLock.yield()) { - return self.tryAcquire() orelse continue; - } - } - - fn acquireSlow(self: *WindowsMutex) Held { - // try to use NT keyed events for blocking, falling back to spinlock if unavailable - @setCold(true); - const handle = StaticResetEvent.Impl.Futex.getEventHandle() orelse return self.acquireSpinning(); - const key = @ptrCast(*const c_void, &self.state.waiters); - - while (true) : (SpinLock.loopHint(1)) { - const waiters = @atomicLoad(u32, &self.state.waiters, .Monotonic); - - // try and take lock if unlocked - if ((waiters & 1) == 0) { - if (@atomicRmw(u8, &self.state.locked, .Xchg, 1, .Acquire) == 0) { - return Held{ .mutex = self }; - } - - // otherwise, try and update the waiting count. - // then unset the WAKE bit so that another unlocker can wake up a thread. - } else if (@cmpxchgWeak(u32, &self.state.waiters, waiters, (waiters + WAIT) | 1, .Monotonic, .Monotonic) == null) { - const rc = windows.ntdll.NtWaitForKeyedEvent(handle, key, windows.FALSE, null); - assert(rc == .SUCCESS); - _ = @atomicRmw(u32, &self.state.waiters, .Sub, WAKE, .Monotonic); - } - } - } - - pub const Held = struct { - mutex: *WindowsMutex, - - pub fn release(self: Held) void { - // unlock without a rmw/cmpxchg instruction - @atomicStore(u8, @ptrCast(*u8, &self.mutex.state.locked), 0, .Release); - const handle = StaticResetEvent.Impl.Futex.getEventHandle() orelse return; - const key = @ptrCast(*const c_void, &self.mutex.state.waiters); - - while (true) : (SpinLock.loopHint(1)) { - const waiters = @atomicLoad(u32, &self.mutex.state.waiters, .Monotonic); - - // no one is waiting - if (waiters < WAIT) return; - // someone grabbed the lock and will do the wake instead - if (waiters & 1 != 0) return; - // someone else is currently waking up - if (waiters & WAKE != 0) return; - - // try to decrease the waiter count & set the WAKE bit meaning a thread is waking up - if (@cmpxchgWeak(u32, &self.mutex.state.waiters, waiters, waiters - WAIT + WAKE, .Release, .Monotonic) == null) { - const rc = windows.ntdll.NtReleaseKeyedEvent(handle, key, windows.FALSE, null); - assert(rc == .SUCCESS); - return; - } - } - } - }; -}; - -const TestContext = struct { - mutex: *Mutex, - data: i128, - - const incr_count = 10000; -}; - -test "std.Mutex" { - var mutex = Mutex{}; - - var context = TestContext{ - .mutex = &mutex, - .data = 0, - }; - - if (builtin.single_threaded) { - worker(&context); - testing.expect(context.data == TestContext.incr_count); - } else { - const thread_count = 10; - var threads: [thread_count]*std.Thread = undefined; - for (threads) |*t| { - t.* = try std.Thread.spawn(&context, worker); - } - for (threads) |t| - t.wait(); - - testing.expect(context.data == thread_count * TestContext.incr_count); - } -} - -fn worker(ctx: *TestContext) void { - var i: usize = 0; - while (i != TestContext.incr_count) : (i += 1) { - const held = ctx.mutex.acquire(); - defer held.release(); - - ctx.data += 1; - } -} diff --git a/lib/std/once.zig b/lib/std/once.zig index f4ac47f8d8..efa99060d3 100644 --- a/lib/std/once.zig +++ b/lib/std/once.zig @@ -15,7 +15,7 @@ pub fn once(comptime f: fn () void) Once(f) { pub fn Once(comptime f: fn () void) type { return struct { done: bool = false, - mutex: std.Mutex = std.Mutex{}, + mutex: std.Thread.Mutex = std.Thread.Mutex{}, /// Call the function `f`. /// If `call` is invoked multiple times `f` will be executed only the diff --git a/lib/std/os/windows/bits.zig b/lib/std/os/windows/bits.zig index d8a2fb4a4d..8461378da0 100644 --- a/lib/std/os/windows/bits.zig +++ b/lib/std/os/windows/bits.zig @@ -1635,3 +1635,8 @@ pub const OBJECT_NAME_INFORMATION = extern struct { Name: UNICODE_STRING, }; pub const POBJECT_NAME_INFORMATION = *OBJECT_NAME_INFORMATION; + +pub const SRWLOCK = usize; +pub const SRWLOCK_INIT: SRWLOCK = 0; +pub const CONDITION_VARIABLE = usize; +pub const CONDITION_VARIABLE_INIT: CONDITION_VARIABLE = 0; diff --git a/lib/std/os/windows/kernel32.zig b/lib/std/os/windows/kernel32.zig index 78d2a6501e..ec4a75afa9 100644 --- a/lib/std/os/windows/kernel32.zig +++ b/lib/std/os/windows/kernel32.zig @@ -289,3 +289,16 @@ pub extern "kernel32" fn K32QueryWorkingSet(hProcess: HANDLE, pv: PVOID, cb: DWO pub extern "kernel32" fn K32QueryWorkingSetEx(hProcess: HANDLE, pv: PVOID, cb: DWORD) callconv(WINAPI) BOOL; pub extern "kernel32" fn FlushFileBuffers(hFile: HANDLE) callconv(WINAPI) BOOL; + +pub extern "kernel32" fn WakeAllConditionVariable(c: *CONDITION_VARIABLE) callconv(WINAPI) void; +pub extern "kernel32" fn WakeConditionVariable(c: *CONDITION_VARIABLE) callconv(WINAPI) void; +pub extern "kernel32" fn SleepConditionVariableSRW( + c: *CONDITION_VARIABLE, + s: *SRWLOCK, + t: DWORD, + f: ULONG, +) callconv(WINAPI) BOOL; + +pub extern "kernel32" fn TryAcquireSRWLockExclusive(s: *SRWLOCK) callconv(WINAPI) BOOL; +pub extern "kernel32" fn AcquireSRWLockExclusive(s: *SRWLOCK) callconv(WINAPI) void; +pub extern "kernel32" fn ReleaseSRWLockExclusive(s: *SRWLOCK) callconv(WINAPI) void; diff --git a/lib/std/std.zig b/lib/std/std.zig index d736899a45..d085d4fc41 100644 --- a/lib/std/std.zig +++ b/lib/std/std.zig @@ -13,7 +13,6 @@ pub const AutoArrayHashMap = array_hash_map.AutoArrayHashMap; pub const AutoArrayHashMapUnmanaged = array_hash_map.AutoArrayHashMapUnmanaged; pub const AutoHashMap = hash_map.AutoHashMap; pub const AutoHashMapUnmanaged = hash_map.AutoHashMapUnmanaged; -pub const AutoResetEvent = @import("auto_reset_event.zig").AutoResetEvent; pub const BufMap = @import("buf_map.zig").BufMap; pub const BufSet = @import("buf_set.zig").BufSet; pub const ChildProcess = @import("child_process.zig").ChildProcess; @@ -21,26 +20,21 @@ pub const ComptimeStringMap = @import("comptime_string_map.zig").ComptimeStringM pub const DynLib = @import("dynamic_library.zig").DynLib; pub const HashMap = hash_map.HashMap; pub const HashMapUnmanaged = hash_map.HashMapUnmanaged; -pub const mutex = @import("mutex.zig"); -pub const Mutex = mutex.Mutex; pub const PackedIntArray = @import("packed_int_array.zig").PackedIntArray; pub const PackedIntArrayEndian = @import("packed_int_array.zig").PackedIntArrayEndian; pub const PackedIntSlice = @import("packed_int_array.zig").PackedIntSlice; pub const PackedIntSliceEndian = @import("packed_int_array.zig").PackedIntSliceEndian; pub const PriorityQueue = @import("priority_queue.zig").PriorityQueue; pub const Progress = @import("Progress.zig"); -pub const ResetEvent = @import("ResetEvent.zig"); pub const SemanticVersion = @import("SemanticVersion.zig"); pub const SinglyLinkedList = @import("linked_list.zig").SinglyLinkedList; -pub const SpinLock = @import("SpinLock.zig"); -pub const StaticResetEvent = @import("StaticResetEvent.zig"); pub const StringHashMap = hash_map.StringHashMap; pub const StringHashMapUnmanaged = hash_map.StringHashMapUnmanaged; pub const StringArrayHashMap = array_hash_map.StringArrayHashMap; pub const StringArrayHashMapUnmanaged = array_hash_map.StringArrayHashMapUnmanaged; pub const TailQueue = @import("linked_list.zig").TailQueue; pub const Target = @import("target.zig").Target; -pub const Thread = @import("thread.zig").Thread; +pub const Thread = @import("Thread.zig"); pub const array_hash_map = @import("array_hash_map.zig"); pub const atomic = @import("atomic.zig"); @@ -98,12 +92,7 @@ test "" { // server is hitting OOM. TODO revert this after stage2 arrives. _ = ChildProcess; _ = DynLib; - _ = mutex; - _ = Mutex; _ = Progress; - _ = ResetEvent; - _ = SpinLock; - _ = StaticResetEvent; _ = Target; _ = Thread; diff --git a/lib/std/thread.zig b/lib/std/thread.zig deleted file mode 100644 index 0fee13b057..0000000000 --- a/lib/std/thread.zig +++ /dev/null @@ -1,526 +0,0 @@ -// SPDX-License-Identifier: MIT -// Copyright (c) 2015-2021 Zig Contributors -// This file is part of [zig](https://ziglang.org/), which is MIT licensed. -// The MIT license requires this copyright notice to be included in all copies -// and substantial portions of the software. -const std = @import("std.zig"); -const builtin = std.builtin; -const os = std.os; -const mem = std.mem; -const windows = std.os.windows; -const c = std.c; -const assert = std.debug.assert; - -const bad_startfn_ret = "expected return type of startFn to be 'u8', 'noreturn', 'void', or '!void'"; - -pub const Thread = struct { - data: Data, - - pub const use_pthreads = std.Target.current.os.tag != .windows and builtin.link_libc; - - /// Represents a kernel thread handle. - /// May be an integer or a pointer depending on the platform. - /// On Linux and POSIX, this is the same as Id. - pub const Handle = if (use_pthreads) - c.pthread_t - else switch (std.Target.current.os.tag) { - .linux => i32, - .windows => windows.HANDLE, - else => void, - }; - - /// Represents a unique ID per thread. - /// May be an integer or pointer depending on the platform. - /// On Linux and POSIX, this is the same as Handle. - pub const Id = switch (std.Target.current.os.tag) { - .windows => windows.DWORD, - else => Handle, - }; - - pub const Data = if (use_pthreads) - struct { - handle: Thread.Handle, - memory: []u8, - } - else switch (std.Target.current.os.tag) { - .linux => struct { - handle: Thread.Handle, - memory: []align(mem.page_size) u8, - }, - .windows => struct { - handle: Thread.Handle, - alloc_start: *c_void, - heap_handle: windows.HANDLE, - }, - else => struct {}, - }; - - /// Returns the ID of the calling thread. - /// Makes a syscall every time the function is called. - /// On Linux and POSIX, this Id is the same as a Handle. - pub fn getCurrentId() Id { - if (use_pthreads) { - return c.pthread_self(); - } else - return switch (std.Target.current.os.tag) { - .linux => os.linux.gettid(), - .windows => windows.kernel32.GetCurrentThreadId(), - else => @compileError("Unsupported OS"), - }; - } - - /// Returns the handle of this thread. - /// On Linux and POSIX, this is the same as Id. - /// On Linux, it is possible that the thread spawned with `spawn` - /// finishes executing entirely before the clone syscall completes. In this - /// case, this function will return 0 rather than the no-longer-existing thread's - /// pid. - pub fn handle(self: Thread) Handle { - return self.data.handle; - } - - pub fn wait(self: *Thread) void { - if (use_pthreads) { - const err = c.pthread_join(self.data.handle, null); - switch (err) { - 0 => {}, - os.EINVAL => unreachable, - os.ESRCH => unreachable, - os.EDEADLK => unreachable, - else => unreachable, - } - std.heap.c_allocator.free(self.data.memory); - std.heap.c_allocator.destroy(self); - } else switch (std.Target.current.os.tag) { - .linux => { - while (true) { - const pid_value = @atomicLoad(i32, &self.data.handle, .SeqCst); - if (pid_value == 0) break; - const rc = os.linux.futex_wait(&self.data.handle, os.linux.FUTEX_WAIT, pid_value, null); - switch (os.linux.getErrno(rc)) { - 0 => continue, - os.EINTR => continue, - os.EAGAIN => continue, - else => unreachable, - } - } - os.munmap(self.data.memory); - }, - .windows => { - windows.WaitForSingleObjectEx(self.data.handle, windows.INFINITE, false) catch unreachable; - windows.CloseHandle(self.data.handle); - windows.HeapFree(self.data.heap_handle, 0, self.data.alloc_start); - }, - else => @compileError("Unsupported OS"), - } - } - - pub const SpawnError = error{ - /// A system-imposed limit on the number of threads was encountered. - /// There are a number of limits that may trigger this error: - /// * the RLIMIT_NPROC soft resource limit (set via setrlimit(2)), - /// which limits the number of processes and threads for a real - /// user ID, was reached; - /// * the kernel's system-wide limit on the number of processes and - /// threads, /proc/sys/kernel/threads-max, was reached (see - /// proc(5)); - /// * the maximum number of PIDs, /proc/sys/kernel/pid_max, was - /// reached (see proc(5)); or - /// * the PID limit (pids.max) imposed by the cgroup "process num‐ - /// ber" (PIDs) controller was reached. - ThreadQuotaExceeded, - - /// The kernel cannot allocate sufficient memory to allocate a task structure - /// for the child, or to copy those parts of the caller's context that need to - /// be copied. - SystemResources, - - /// Not enough userland memory to spawn the thread. - OutOfMemory, - - /// `mlockall` is enabled, and the memory needed to spawn the thread - /// would exceed the limit. - LockedMemoryLimitExceeded, - - Unexpected, - }; - - /// caller must call wait on the returned thread - /// fn startFn(@TypeOf(context)) T - /// where T is u8, noreturn, void, or !void - /// caller must call wait on the returned thread - pub fn spawn(context: anytype, comptime startFn: anytype) SpawnError!*Thread { - if (builtin.single_threaded) @compileError("cannot spawn thread when building in single-threaded mode"); - // TODO compile-time call graph analysis to determine stack upper bound - // https://github.com/ziglang/zig/issues/157 - const default_stack_size = 16 * 1024 * 1024; - - const Context = @TypeOf(context); - comptime assert(@typeInfo(@TypeOf(startFn)).Fn.args[0].arg_type.? == Context); - - if (std.Target.current.os.tag == .windows) { - const WinThread = struct { - const OuterContext = struct { - thread: Thread, - inner: Context, - }; - fn threadMain(raw_arg: windows.LPVOID) callconv(.C) windows.DWORD { - const arg = if (@sizeOf(Context) == 0) {} else @ptrCast(*Context, @alignCast(@alignOf(Context), raw_arg)).*; - - switch (@typeInfo(@typeInfo(@TypeOf(startFn)).Fn.return_type.?)) { - .NoReturn => { - startFn(arg); - }, - .Void => { - startFn(arg); - return 0; - }, - .Int => |info| { - if (info.bits != 8) { - @compileError(bad_startfn_ret); - } - return startFn(arg); - }, - .ErrorUnion => |info| { - if (info.payload != void) { - @compileError(bad_startfn_ret); - } - startFn(arg) catch |err| { - std.debug.warn("error: {s}\n", .{@errorName(err)}); - if (@errorReturnTrace()) |trace| { - std.debug.dumpStackTrace(trace.*); - } - }; - return 0; - }, - else => @compileError(bad_startfn_ret), - } - } - }; - - const heap_handle = windows.kernel32.GetProcessHeap() orelse return error.OutOfMemory; - const byte_count = @alignOf(WinThread.OuterContext) + @sizeOf(WinThread.OuterContext); - const bytes_ptr = windows.kernel32.HeapAlloc(heap_handle, 0, byte_count) orelse return error.OutOfMemory; - errdefer assert(windows.kernel32.HeapFree(heap_handle, 0, bytes_ptr) != 0); - const bytes = @ptrCast([*]u8, bytes_ptr)[0..byte_count]; - const outer_context = std.heap.FixedBufferAllocator.init(bytes).allocator.create(WinThread.OuterContext) catch unreachable; - outer_context.* = WinThread.OuterContext{ - .thread = Thread{ - .data = Thread.Data{ - .heap_handle = heap_handle, - .alloc_start = bytes_ptr, - .handle = undefined, - }, - }, - .inner = context, - }; - - const parameter = if (@sizeOf(Context) == 0) null else @ptrCast(*c_void, &outer_context.inner); - outer_context.thread.data.handle = windows.kernel32.CreateThread(null, default_stack_size, WinThread.threadMain, parameter, 0, null) orelse { - switch (windows.kernel32.GetLastError()) { - else => |err| return windows.unexpectedError(err), - } - }; - return &outer_context.thread; - } - - const MainFuncs = struct { - fn linuxThreadMain(ctx_addr: usize) callconv(.C) u8 { - const arg = if (@sizeOf(Context) == 0) {} else @intToPtr(*const Context, ctx_addr).*; - - switch (@typeInfo(@typeInfo(@TypeOf(startFn)).Fn.return_type.?)) { - .NoReturn => { - startFn(arg); - }, - .Void => { - startFn(arg); - return 0; - }, - .Int => |info| { - if (info.bits != 8) { - @compileError(bad_startfn_ret); - } - return startFn(arg); - }, - .ErrorUnion => |info| { - if (info.payload != void) { - @compileError(bad_startfn_ret); - } - startFn(arg) catch |err| { - std.debug.warn("error: {s}\n", .{@errorName(err)}); - if (@errorReturnTrace()) |trace| { - std.debug.dumpStackTrace(trace.*); - } - }; - return 0; - }, - else => @compileError(bad_startfn_ret), - } - } - fn posixThreadMain(ctx: ?*c_void) callconv(.C) ?*c_void { - const arg = if (@sizeOf(Context) == 0) {} else @ptrCast(*Context, @alignCast(@alignOf(Context), ctx)).*; - - switch (@typeInfo(@typeInfo(@TypeOf(startFn)).Fn.return_type.?)) { - .NoReturn => { - startFn(arg); - }, - .Void => { - startFn(arg); - return null; - }, - .Int => |info| { - if (info.bits != 8) { - @compileError(bad_startfn_ret); - } - // pthreads don't support exit status, ignore value - _ = startFn(arg); - return null; - }, - .ErrorUnion => |info| { - if (info.payload != void) { - @compileError(bad_startfn_ret); - } - startFn(arg) catch |err| { - std.debug.warn("error: {s}\n", .{@errorName(err)}); - if (@errorReturnTrace()) |trace| { - std.debug.dumpStackTrace(trace.*); - } - }; - return null; - }, - else => @compileError(bad_startfn_ret), - } - } - }; - - if (Thread.use_pthreads) { - var attr: c.pthread_attr_t = undefined; - if (c.pthread_attr_init(&attr) != 0) return error.SystemResources; - defer assert(c.pthread_attr_destroy(&attr) == 0); - - const thread_obj = try std.heap.c_allocator.create(Thread); - errdefer std.heap.c_allocator.destroy(thread_obj); - if (@sizeOf(Context) > 0) { - thread_obj.data.memory = try std.heap.c_allocator.allocAdvanced( - u8, - @alignOf(Context), - @sizeOf(Context), - .at_least, - ); - errdefer std.heap.c_allocator.free(thread_obj.data.memory); - mem.copy(u8, thread_obj.data.memory, mem.asBytes(&context)); - } else { - thread_obj.data.memory = @as([*]u8, undefined)[0..0]; - } - - // Use the same set of parameters used by the libc-less impl. - assert(c.pthread_attr_setstacksize(&attr, default_stack_size) == 0); - assert(c.pthread_attr_setguardsize(&attr, mem.page_size) == 0); - - const err = c.pthread_create( - &thread_obj.data.handle, - &attr, - MainFuncs.posixThreadMain, - thread_obj.data.memory.ptr, - ); - switch (err) { - 0 => return thread_obj, - os.EAGAIN => return error.SystemResources, - os.EPERM => unreachable, - os.EINVAL => unreachable, - else => return os.unexpectedErrno(@intCast(usize, err)), - } - - return thread_obj; - } - - var guard_end_offset: usize = undefined; - var stack_end_offset: usize = undefined; - var thread_start_offset: usize = undefined; - var context_start_offset: usize = undefined; - var tls_start_offset: usize = undefined; - const mmap_len = blk: { - var l: usize = mem.page_size; - // Allocate a guard page right after the end of the stack region - guard_end_offset = l; - // The stack itself, which grows downwards. - l = mem.alignForward(l + default_stack_size, mem.page_size); - stack_end_offset = l; - // Above the stack, so that it can be in the same mmap call, put the Thread object. - l = mem.alignForward(l, @alignOf(Thread)); - thread_start_offset = l; - l += @sizeOf(Thread); - // Next, the Context object. - if (@sizeOf(Context) != 0) { - l = mem.alignForward(l, @alignOf(Context)); - context_start_offset = l; - l += @sizeOf(Context); - } - // Finally, the Thread Local Storage, if any. - l = mem.alignForward(l, os.linux.tls.tls_image.alloc_align); - tls_start_offset = l; - l += os.linux.tls.tls_image.alloc_size; - // Round the size to the page size. - break :blk mem.alignForward(l, mem.page_size); - }; - - const mmap_slice = mem: { - // Map the whole stack with no rw permissions to avoid - // committing the whole region right away - const mmap_slice = os.mmap( - null, - mmap_len, - os.PROT_NONE, - os.MAP_PRIVATE | os.MAP_ANONYMOUS, - -1, - 0, - ) catch |err| switch (err) { - error.MemoryMappingNotSupported => unreachable, - error.AccessDenied => unreachable, - error.PermissionDenied => unreachable, - else => |e| return e, - }; - errdefer os.munmap(mmap_slice); - - // Map everything but the guard page as rw - os.mprotect( - mmap_slice[guard_end_offset..], - os.PROT_READ | os.PROT_WRITE, - ) catch |err| switch (err) { - error.AccessDenied => unreachable, - else => |e| return e, - }; - - break :mem mmap_slice; - }; - - const mmap_addr = @ptrToInt(mmap_slice.ptr); - - const thread_ptr = @alignCast(@alignOf(Thread), @intToPtr(*Thread, mmap_addr + thread_start_offset)); - thread_ptr.data.memory = mmap_slice; - - var arg: usize = undefined; - if (@sizeOf(Context) != 0) { - arg = mmap_addr + context_start_offset; - const context_ptr = @alignCast(@alignOf(Context), @intToPtr(*Context, arg)); - context_ptr.* = context; - } - - if (std.Target.current.os.tag == .linux) { - const flags: u32 = os.CLONE_VM | os.CLONE_FS | os.CLONE_FILES | - os.CLONE_SIGHAND | os.CLONE_THREAD | os.CLONE_SYSVSEM | - os.CLONE_PARENT_SETTID | os.CLONE_CHILD_CLEARTID | - os.CLONE_DETACHED | os.CLONE_SETTLS; - // This structure is only needed when targeting i386 - var user_desc: if (std.Target.current.cpu.arch == .i386) os.linux.user_desc else void = undefined; - - const tls_area = mmap_slice[tls_start_offset..]; - const tp_value = os.linux.tls.prepareTLS(tls_area); - - const newtls = blk: { - if (std.Target.current.cpu.arch == .i386) { - user_desc = os.linux.user_desc{ - .entry_number = os.linux.tls.tls_image.gdt_entry_number, - .base_addr = tp_value, - .limit = 0xfffff, - .seg_32bit = 1, - .contents = 0, // Data - .read_exec_only = 0, - .limit_in_pages = 1, - .seg_not_present = 0, - .useable = 1, - }; - break :blk @ptrToInt(&user_desc); - } else { - break :blk tp_value; - } - }; - - const rc = os.linux.clone( - MainFuncs.linuxThreadMain, - mmap_addr + stack_end_offset, - flags, - arg, - &thread_ptr.data.handle, - newtls, - &thread_ptr.data.handle, - ); - switch (os.errno(rc)) { - 0 => return thread_ptr, - os.EAGAIN => return error.ThreadQuotaExceeded, - os.EINVAL => unreachable, - os.ENOMEM => return error.SystemResources, - os.ENOSPC => unreachable, - os.EPERM => unreachable, - os.EUSERS => unreachable, - else => |err| return os.unexpectedErrno(err), - } - } else { - @compileError("Unsupported OS"); - } - } - - pub const CpuCountError = error{ - PermissionDenied, - SystemResources, - Unexpected, - }; - - pub fn cpuCount() CpuCountError!usize { - if (std.Target.current.os.tag == .linux) { - const cpu_set = try os.sched_getaffinity(0); - return @as(usize, os.CPU_COUNT(cpu_set)); // TODO should not need this usize cast - } - if (std.Target.current.os.tag == .windows) { - return os.windows.peb().NumberOfProcessors; - } - if (std.Target.current.os.tag == .openbsd) { - var count: c_int = undefined; - var count_size: usize = @sizeOf(c_int); - const mib = [_]c_int{ os.CTL_HW, os.HW_NCPUONLINE }; - os.sysctl(&mib, &count, &count_size, null, 0) catch |err| switch (err) { - error.NameTooLong, error.UnknownName => unreachable, - else => |e| return e, - }; - return @intCast(usize, count); - } - var count: c_int = undefined; - var count_len: usize = @sizeOf(c_int); - const name = if (comptime std.Target.current.isDarwin()) "hw.logicalcpu" else "hw.ncpu"; - os.sysctlbynameZ(name, &count, &count_len, null, 0) catch |err| switch (err) { - error.NameTooLong, error.UnknownName => unreachable, - else => |e| return e, - }; - return @intCast(usize, count); - } - - pub fn getCurrentThreadId() u64 { - switch (std.Target.current.os.tag) { - .linux => { - // Use the syscall directly as musl doesn't provide a wrapper. - return @bitCast(u32, os.linux.gettid()); - }, - .windows => { - return os.windows.kernel32.GetCurrentThreadId(); - }, - .macos, .ios, .watchos, .tvos => { - var thread_id: u64 = undefined; - // Pass thread=null to get the current thread ID. - assert(c.pthread_threadid_np(null, &thread_id) == 0); - return thread_id; - }, - .netbsd => { - return @bitCast(u32, c._lwp_self()); - }, - .freebsd => { - return @bitCast(u32, c.pthread_getthreadid_np()); - }, - .openbsd => { - return @bitCast(u32, c.getthrid()); - }, - else => { - @compileError("getCurrentThreadId not implemented for this platform"); - }, - } - } -}; diff --git a/src/Compilation.zig b/src/Compilation.zig index 0efad3362d..225a91e5d2 100644 --- a/src/Compilation.zig +++ b/src/Compilation.zig @@ -125,7 +125,7 @@ owned_link_dir: ?std.fs.Dir, color: @import("main.zig").Color = .auto, /// This mutex guards all `Compilation` mutable state. -mutex: std.Mutex = .{}, +mutex: std.Thread.Mutex = .{}, test_filter: ?[]const u8, test_name_prefix: ?[]const u8, diff --git a/src/ThreadPool.zig b/src/ThreadPool.zig index 1e91d3f731..ec580210e9 100644 --- a/src/ThreadPool.zig +++ b/src/ThreadPool.zig @@ -6,14 +6,14 @@ const std = @import("std"); const ThreadPool = @This(); -lock: std.Mutex = .{}, +lock: std.Thread.Mutex = .{}, is_running: bool = true, allocator: *std.mem.Allocator, workers: []Worker, run_queue: RunQueue = .{}, idle_queue: IdleQueue = .{}, -const IdleQueue = std.SinglyLinkedList(std.ResetEvent); +const IdleQueue = std.SinglyLinkedList(std.Thread.ResetEvent); const RunQueue = std.SinglyLinkedList(Runnable); const Runnable = struct { runFn: fn (*Runnable) void, diff --git a/src/WaitGroup.zig b/src/WaitGroup.zig index bd6274c10a..d690143710 100644 --- a/src/WaitGroup.zig +++ b/src/WaitGroup.zig @@ -6,9 +6,9 @@ const std = @import("std"); const WaitGroup = @This(); -lock: std.Mutex = .{}, +lock: std.Thread.Mutex = .{}, counter: usize = 0, -event: std.ResetEvent, +event: std.Thread.ResetEvent, pub fn init(self: *WaitGroup) !void { self.* = .{ -- cgit v1.2.3