aboutsummaryrefslogtreecommitdiff
path: root/lib/std
diff options
context:
space:
mode:
Diffstat (limited to 'lib/std')
-rw-r--r--lib/std/Build/Fuzz.zig6
-rw-r--r--lib/std/Io.zig87
-rw-r--r--lib/std/Io/Dir.zig5
-rw-r--r--lib/std/Io/Kqueue.zig13
-rw-r--r--lib/std/Io/Threaded.zig245
-rw-r--r--lib/std/Io/Threaded/test.zig2
-rw-r--r--lib/std/Io/net.zig14
-rw-r--r--lib/std/Io/net/HostName.zig2
-rw-r--r--lib/std/Io/net/test.zig2
-rw-r--r--lib/std/Io/test.zig37
-rw-r--r--lib/std/Progress.zig30
-rw-r--r--lib/std/c.zig15
-rw-r--r--lib/std/crypto.zig2
-rw-r--r--lib/std/crypto/argon2.zig30
-rw-r--r--lib/std/crypto/blake3.zig16
-rw-r--r--lib/std/crypto/kangarootwelve.zig16
-rw-r--r--lib/std/fs.zig4
-rw-r--r--lib/std/fs/get_app_data_dir.zig66
-rw-r--r--lib/std/fs/test.zig4
-rw-r--r--lib/std/mem.zig48
-rw-r--r--lib/std/posix.zig42
-rw-r--r--lib/std/posix/test.zig9
-rw-r--r--lib/std/zig.zig1
-rw-r--r--lib/std/zig/WindowsSdk.zig9
24 files changed, 486 insertions, 219 deletions
diff --git a/lib/std/Build/Fuzz.zig b/lib/std/Build/Fuzz.zig
index d308efdf70..5521f7393b 100644
--- a/lib/std/Build/Fuzz.zig
+++ b/lib/std/Build/Fuzz.zig
@@ -78,7 +78,7 @@ pub fn init(
all_steps: []const *Build.Step,
root_prog_node: std.Progress.Node,
mode: Mode,
-) Allocator.Error!Fuzz {
+) error{ OutOfMemory, Canceled }!Fuzz {
const run_steps: []const *Step.Run = steps: {
var steps: std.ArrayList(*Step.Run) = .empty;
defer steps.deinit(gpa);
@@ -98,7 +98,7 @@ pub fn init(
if (steps.items.len == 0) fatal("no fuzz tests found", .{});
rebuild_node.setEstimatedTotalItems(steps.items.len);
const run_steps = try gpa.dupe(*Step.Run, steps.items);
- rebuild_group.wait(io);
+ try rebuild_group.await(io);
break :steps run_steps;
};
errdefer gpa.free(run_steps);
@@ -517,7 +517,7 @@ pub fn waitAndPrintReport(fuzz: *Fuzz) void {
assert(fuzz.mode == .limit);
const io = fuzz.io;
- fuzz.group.wait(io);
+ fuzz.group.awaitUncancelable(io);
fuzz.group = .init;
std.debug.print("======= FUZZING REPORT =======\n", .{});
diff --git a/lib/std/Io.zig b/lib/std/Io.zig
index 7c3aa98e16..cf23796db2 100644
--- a/lib/std/Io.zig
+++ b/lib/std/Io.zig
@@ -436,7 +436,7 @@ pub fn Poller(comptime StreamEnum: type) type {
// Cancel the pending read into the FIFO.
_ = windows.kernel32.CancelIo(handle);
- // We have to wait for the handle to be signalled, i.e. for the cancellation to complete.
+ // We have to wait for the handle to be signalled, i.e. for the cancelation to complete.
switch (windows.kernel32.WaitForSingleObject(handle, windows.INFINITE)) {
windows.WAIT_OBJECT_0 => {},
windows.WAIT_FAILED => return windows.unexpectedError(windows.GetLastError()),
@@ -631,7 +631,7 @@ pub const VTable = struct {
/// Copied and then passed to `start`.
context: []const u8,
context_alignment: std.mem.Alignment,
- start: *const fn (*Group, context: *const anyopaque) void,
+ start: *const fn (*Group, context: *const anyopaque) Cancelable!void,
) void,
/// Thread-safe.
groupConcurrent: *const fn (
@@ -642,9 +642,9 @@ pub const VTable = struct {
/// Copied and then passed to `start`.
context: []const u8,
context_alignment: std.mem.Alignment,
- start: *const fn (*Group, context: *const anyopaque) void,
+ start: *const fn (*Group, context: *const anyopaque) Cancelable!void,
) ConcurrentError!void,
- groupWait: *const fn (?*anyopaque, *Group, token: *anyopaque) void,
+ groupAwait: *const fn (?*anyopaque, *Group, token: *anyopaque) Cancelable!void,
groupCancel: *const fn (?*anyopaque, *Group, token: *anyopaque) void,
recancel: *const fn (?*anyopaque) void,
@@ -713,8 +713,8 @@ pub const VTable = struct {
processExecutableOpen: *const fn (?*anyopaque, File.OpenFlags) std.process.OpenExecutableError!File,
processExecutablePath: *const fn (?*anyopaque, buffer: []u8) std.process.ExecutablePathError!usize,
- lockStderr: *const fn (?*anyopaque, buffer: []u8, ?Terminal.Mode) Cancelable!LockedStderr,
- tryLockStderr: *const fn (?*anyopaque, buffer: []u8, ?Terminal.Mode) Cancelable!?LockedStderr,
+ lockStderr: *const fn (?*anyopaque, ?Terminal.Mode) Cancelable!LockedStderr,
+ tryLockStderr: *const fn (?*anyopaque, ?Terminal.Mode) Cancelable!?LockedStderr,
unlockStderr: *const fn (?*anyopaque) void,
processSetCurrentDir: *const fn (?*anyopaque, Dir) std.process.SetCurrentDirError!void,
@@ -734,6 +734,7 @@ pub const VTable = struct {
netWrite: *const fn (?*anyopaque, dest: net.Socket.Handle, header: []const u8, data: []const []const u8, splat: usize) net.Stream.Writer.Error!usize,
netWriteFile: *const fn (?*anyopaque, net.Socket.Handle, header: []const u8, *Io.File.Reader, Io.Limit) net.Stream.Writer.WriteFileError!usize,
netClose: *const fn (?*anyopaque, handle: []const net.Socket.Handle) void,
+ netShutdown: *const fn (?*anyopaque, handle: net.Socket.Handle, how: net.ShutdownHow) net.ShutdownError!void,
netInterfaceNameResolve: *const fn (?*anyopaque, *const net.Interface.Name) net.Interface.Name.ResolveError!net.Interface,
netInterfaceName: *const fn (?*anyopaque, net.Interface) net.Interface.NameError!net.Interface.Name,
netLookup: *const fn (?*anyopaque, net.HostName, *Queue(net.HostName.LookupResult), net.HostName.LookupOptions) net.HostName.LookupError!void,
@@ -1022,7 +1023,7 @@ pub fn Future(Result: type) type {
any_future: ?*AnyFuture,
result: Result,
- /// Equivalent to `await` but places a cancellation request. This causes the task to receive
+ /// Equivalent to `await` but places a cancelation request. This causes the task to receive
/// `error.Canceled` from its next "cancelation point" (if any). A cancelation point is a
/// call to a function in `Io` which can return `error.Canceled`.
///
@@ -1070,7 +1071,7 @@ pub const Group = struct {
/// already been called and completed, or it has successfully been assigned
/// a unit of concurrency.
///
- /// After this is called, `wait` or `cancel` must be called before the
+ /// After this is called, `await` or `cancel` must be called before the
/// group is deinitialized.
///
/// Threadsafe.
@@ -1081,21 +1082,21 @@ pub const Group = struct {
pub fn async(g: *Group, io: Io, function: anytype, args: std.meta.ArgsTuple(@TypeOf(function))) void {
const Args = @TypeOf(args);
const TypeErased = struct {
- fn start(group: *Group, context: *const anyopaque) void {
+ fn start(group: *Group, context: *const anyopaque) Cancelable!void {
_ = group;
const args_casted: *const Args = @ptrCast(@alignCast(context));
- @call(.auto, function, args_casted.*);
+ return @call(.auto, function, args_casted.*);
}
};
io.vtable.groupAsync(io.userdata, g, @ptrCast(&args), .of(Args), TypeErased.start);
}
/// Calls `function` with `args`, such that the function is not guaranteed
- /// to have returned until `wait` is called, allowing the caller to
+ /// to have returned until `await` is called, allowing the caller to
/// progress while waiting for any `Io` operations.
///
/// The resource spawned is owned by the group; after this is called,
- /// `wait` or `cancel` must be called before the group is deinitialized.
+ /// `await` or `cancel` must be called before the group is deinitialized.
///
/// This has stronger guarantee than `async`, placing restrictions on what kind
/// of `Io` implementations are supported. By calling `async` instead, one
@@ -1109,30 +1110,41 @@ pub const Group = struct {
pub fn concurrent(g: *Group, io: Io, function: anytype, args: std.meta.ArgsTuple(@TypeOf(function))) ConcurrentError!void {
const Args = @TypeOf(args);
const TypeErased = struct {
- fn start(group: *Group, context: *const anyopaque) void {
+ fn start(group: *Group, context: *const anyopaque) Cancelable!void {
_ = group;
const args_casted: *const Args = @ptrCast(@alignCast(context));
- @call(.auto, function, args_casted.*);
+ return @call(.auto, function, args_casted.*);
}
};
return io.vtable.groupConcurrent(io.userdata, g, @ptrCast(&args), .of(Args), TypeErased.start);
}
/// Blocks until all tasks of the group finish. During this time,
- /// cancellation requests propagate to all members of the group.
+ /// cancelation requests propagate to all members of the group.
///
/// Idempotent. Not threadsafe.
///
/// It is safe to call this function concurrently with `Group.async` or
/// `Group.concurrent`, provided that the group does not complete until
/// the call to `Group.async` or `Group.concurrent` returns.
- pub fn wait(g: *Group, io: Io) void {
+ pub fn await(g: *Group, io: Io) Cancelable!void {
const token = g.token.load(.acquire) orelse return;
- io.vtable.groupWait(io.userdata, g, token);
+ try io.vtable.groupAwait(io.userdata, g, token);
assert(g.token.raw == null);
}
- /// Equivalent to `wait` but immediately requests cancellation on all
+ /// Equivalent to `await` but temporarily blocks cancelation while waiting.
+ pub fn awaitUncancelable(g: *Group, io: Io) void {
+ const token = g.token.load(.acquire) orelse return;
+ const prev = swapCancelProtection(io, .blocked);
+ defer _ = swapCancelProtection(io, prev);
+ io.vtable.groupAwait(io.userdata, g, token) catch |err| switch (err) {
+ error.Canceled => unreachable,
+ };
+ assert(g.token.raw == null);
+ }
+
+ /// Equivalent to `await` but immediately requests cancelation on all
/// members of the group.
///
/// For a description of cancelation and cancelation points, see `Future.cancel`.
@@ -1253,7 +1265,7 @@ pub fn Select(comptime U: type) type {
) void {
const Args = @TypeOf(args);
const TypeErased = struct {
- fn start(group: *Group, context: *const anyopaque) void {
+ fn start(group: *Group, context: *const anyopaque) Cancelable!void {
const args_casted: *const Args = @ptrCast(@alignCast(context));
const unerased_select: *S = @fieldParentPtr("group", group);
const elem = @unionInit(U, @tagName(field), @call(.auto, function, args_casted.*));
@@ -1271,7 +1283,7 @@ pub fn Select(comptime U: type) type {
/// Asserts there is at least one more `outstanding` task.
///
/// Not threadsafe.
- pub fn wait(s: *S) Cancelable!U {
+ pub fn await(s: *S) Cancelable!U {
s.outstanding -= 1;
return s.queue.getOne(s.io) catch |err| switch (err) {
error.Canceled => |e| return e,
@@ -1279,7 +1291,7 @@ pub fn Select(comptime U: type) type {
};
}
- /// Equivalent to `wait` but requests cancellation on all remaining
+ /// Equivalent to `wait` but requests cancelation on all remaining
/// tasks owned by the select.
///
/// For a description of cancelation and cancelation points, see `Future.cancel`.
@@ -1336,7 +1348,13 @@ pub fn futexWake(io: Io, comptime T: type, ptr: *align(@alignOf(u32)) const T, m
return io.vtable.futexWake(io.userdata, @ptrCast(ptr), max_waiters);
}
-pub const Mutex = struct {
+/// Mutex is a synchronization primitive which enforces atomic access to a
+/// shared region of code known as the "critical section".
+///
+/// Mutex is an extern struct so that it may be used as a field inside another
+/// extern struct. Having a guaranteed memory layout including mutexes is
+/// important for IPC over shared memory (mmap).
+pub const Mutex = extern struct {
state: std.atomic.Value(State),
pub const init: Mutex = .{ .state = .init(.unlocked) };
@@ -2189,6 +2207,23 @@ pub const LockedStderr = struct {
.mode = ls.terminal_mode,
};
}
+
+ pub fn clear(ls: LockedStderr, buffer: []u8) Cancelable!void {
+ const fw = ls.file_writer;
+ std.Progress.clearWrittenWithEscapeCodes(fw) catch |err| switch (err) {
+ error.WriteFailed => switch (fw.err.?) {
+ error.Canceled => |e| return e,
+ else => {},
+ },
+ };
+ fw.interface.flush() catch |err| switch (err) {
+ error.WriteFailed => switch (fw.err.?) {
+ error.Canceled => |e| return e,
+ else => {},
+ },
+ };
+ fw.interface.buffer = buffer;
+ }
};
/// For doing application-level writes to the standard error stream.
@@ -2199,12 +2234,16 @@ pub const LockedStderr = struct {
/// See also:
/// * `tryLockStderr`
pub fn lockStderr(io: Io, buffer: []u8, terminal_mode: ?Terminal.Mode) Cancelable!LockedStderr {
- return io.vtable.lockStderr(io.userdata, buffer, terminal_mode);
+ const ls = try io.vtable.lockStderr(io.userdata, terminal_mode);
+ try ls.clear(buffer);
+ return ls;
}
/// Same as `lockStderr` but non-blocking.
pub fn tryLockStderr(io: Io, buffer: []u8, terminal_mode: ?Terminal.Mode) Cancelable!?LockedStderr {
- return io.vtable.tryLockStderr(io.userdata, buffer, terminal_mode);
+ const ls = (try io.vtable.tryLockStderr(io.userdata, buffer, terminal_mode)) orelse return null;
+ try ls.clear(buffer);
+ return ls;
}
pub fn unlockStderr(io: Io) void {
diff --git a/lib/std/Io/Dir.zig b/lib/std/Io/Dir.zig
index 577e114c40..82bf3b927d 100644
--- a/lib/std/Io/Dir.zig
+++ b/lib/std/Io/Dir.zig
@@ -113,6 +113,7 @@ pub const Reader = struct {
},
.wasi => @sizeOf(std.os.wasi.dirent_t) +
std.mem.alignForward(usize, max_name_bytes, @alignOf(std.os.wasi.dirent_t)),
+ .openbsd => std.c.S.BLKSIZE,
else => if (builtin.link_libc) @sizeOf(std.c.dirent) else std.mem.alignForward(usize, max_name_bytes, @alignOf(usize)),
};
@@ -373,8 +374,8 @@ pub const Walker = struct {
/// Leaves the current directory, continuing walking one level up.
/// If the current entry is a directory entry, then the "current directory"
/// is the directory pertaining to the current entry.
- pub fn leave(self: *Walker) void {
- self.inner.leave();
+ pub fn leave(self: *Walker, io: Io) void {
+ self.inner.leave(io);
}
};
diff --git a/lib/std/Io/Kqueue.zig b/lib/std/Io/Kqueue.zig
index 26b8298cab..df9fa1dee6 100644
--- a/lib/std/Io/Kqueue.zig
+++ b/lib/std/Io/Kqueue.zig
@@ -900,6 +900,7 @@ pub fn io(k: *Kqueue) Io {
.netConnectIp = netConnectIp,
.netConnectUnix = netConnectUnix,
.netClose = netClose,
+ .netShutdown = netShutdown,
.netRead = netRead,
.netWrite = netWrite,
.netSend = netSend,
@@ -1549,12 +1550,22 @@ fn netWrite(userdata: ?*anyopaque, dest: net.Socket.Handle, header: []const u8,
_ = splat;
@panic("TODO");
}
+
fn netClose(userdata: ?*anyopaque, handle: net.Socket.Handle) void {
const k: *Kqueue = @ptrCast(@alignCast(userdata));
_ = k;
_ = handle;
@panic("TODO");
}
+
+fn netShutdown(userdata: ?*anyopaque, handle: net.Socket.Handle, how: net.ShutdownHow) net.ShutdownError!void {
+ const k: *Kqueue = @ptrCast(@alignCast(userdata));
+ _ = k;
+ _ = handle;
+ _ = how;
+ @panic("TODO");
+}
+
fn netInterfaceNameResolve(
userdata: ?*anyopaque,
name: *const net.Interface.Name,
@@ -1564,12 +1575,14 @@ fn netInterfaceNameResolve(
_ = name;
@panic("TODO");
}
+
fn netInterfaceName(userdata: ?*anyopaque, interface: net.Interface) net.Interface.NameError!net.Interface.Name {
const k: *Kqueue = @ptrCast(@alignCast(userdata));
_ = k;
_ = interface;
@panic("TODO");
}
+
fn netLookup(
userdata: ?*anyopaque,
host_name: net.HostName,
diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig
index 9e758be9fd..c9e6ba3fc7 100644
--- a/lib/std/Io/Threaded.zig
+++ b/lib/std/Io/Threaded.zig
@@ -390,6 +390,52 @@ const Thread = struct {
else => unreachable,
};
},
+ .openbsd => {
+ var tm: std.c.timespec = undefined;
+ var tm_ptr: ?*const std.c.timespec = null;
+ if (timeout_ns) |ns| {
+ tm_ptr = &tm;
+ tm = timestampToPosix(ns);
+ }
+ if (thread) |t| try t.beginSyscall();
+ const rc = std.c.futex(
+ ptr,
+ std.c.FUTEX.WAIT | std.c.FUTEX.PRIVATE_FLAG,
+ @as(c_int, @bitCast(expect)),
+ tm_ptr,
+ null, // uaddr2 is ignored
+ );
+ if (thread) |t| t.endSyscall();
+ if (is_debug) switch (posix.errno(rc)) {
+ .SUCCESS => {},
+ .NOSYS => unreachable, // constant op known good value
+ .AGAIN => {}, // contents of uaddr != val
+ .INVAL => unreachable, // invalid timeout
+ .TIMEDOUT => {}, // timeout
+ .INTR => {}, // a signal arrived
+ .CANCELED => {}, // a signal arrived and SA_RESTART was set
+ else => unreachable,
+ };
+ },
+ .dragonfly => {
+ var timeout_us: c_int = undefined;
+ if (timeout_ns) |ns| {
+ timeout_us = std.math.cast(c_int, ns / std.time.ns_per_us) orelse std.math.maxInt(c_int);
+ } else {
+ timeout_us = 0;
+ }
+ if (thread) |t| try t.beginSyscall();
+ const rc = std.c.umtx_sleep(@ptrCast(ptr), @bitCast(expect), timeout_us);
+ if (thread) |t| t.endSyscall();
+ if (is_debug) switch (std.posix.errno(rc)) {
+ .SUCCESS => {},
+ .BUSY => {}, // ptr != expect
+ .AGAIN => {}, // maybe timed out, or paged out, or hit 2s kernel refresh
+ .INTR => {}, // spurious wake
+ .INVAL => unreachable, // invalid timeout
+ else => unreachable,
+ };
+ },
else => if (std.Thread.use_pthreads) {
// TODO integrate the following function being called with robust cancelation.
return pthreads_futex.wait(ptr, expect, timeout_ns) catch |err| switch (err) {
@@ -473,6 +519,23 @@ const Thread = struct {
else => unreachable, // deadlock due to operating system bug
}
},
+ .openbsd => {
+ const rc = std.c.futex(
+ ptr,
+ std.c.FUTEX.WAKE | std.c.FUTEX.PRIVATE_FLAG,
+ @min(max_waiters, std.math.maxInt(c_int)),
+ null, // timeout is ignored
+ null, // uaddr2 is ignored
+ );
+ assert(rc >= 0);
+ },
+ .dragonfly => {
+ // will generally return 0 unless the address is bad
+ _ = std.c.umtx_wakeup(
+ @ptrCast(ptr),
+ @min(max_waiters, std.math.maxInt(c_int)),
+ );
+ },
else => if (std.Thread.use_pthreads) {
return pthreads_futex.wake(ptr, max_waiters);
} else {
@@ -795,7 +858,7 @@ pub fn io(t: *Threaded) Io {
.groupAsync = groupAsync,
.groupConcurrent = groupConcurrent,
- .groupWait = groupWait,
+ .groupAwait = groupAwait,
.groupCancel = groupCancel,
.recancel = recancel,
@@ -891,6 +954,10 @@ pub fn io(t: *Threaded) Io {
else => netConnectUnixPosix,
},
.netClose = netClose,
+ .netShutdown = switch (native_os) {
+ .windows => netShutdownWindows,
+ else => netShutdownPosix,
+ },
.netRead = switch (native_os) {
.windows => netReadWindows,
else => netReadPosix,
@@ -929,7 +996,7 @@ pub fn ioBasic(t: *Threaded) Io {
.groupAsync = groupAsync,
.groupConcurrent = groupConcurrent,
- .groupWait = groupWait,
+ .groupAwait = groupAwait,
.groupCancel = groupCancel,
.recancel = recancel,
@@ -1007,6 +1074,7 @@ pub fn ioBasic(t: *Threaded) Io {
.netConnectIp = netConnectIpUnavailable,
.netConnectUnix = netConnectUnixUnavailable,
.netClose = netCloseUnavailable,
+ .netShutdown = netShutdownUnavailable,
.netRead = netReadUnavailable,
.netWrite = netWriteUnavailable,
.netWriteFile = netWriteFileUnavailable,
@@ -1161,6 +1229,7 @@ const AsyncClosure = struct {
error.Canceled => {
ac.closure.requestCancel(t);
ac.event.waitUncancelable(ioBasic(t));
+ recancel(t);
},
};
@memcpy(result, ac.resultPointer()[0..result.len]);
@@ -1273,7 +1342,7 @@ const GroupClosure = struct {
group: *Io.Group,
/// Points to sibling `GroupClosure`. Used for walking the group to cancel all.
node: std.SinglyLinkedList.Node,
- func: *const fn (*Io.Group, context: *anyopaque) void,
+ func: *const fn (*Io.Group, context: *anyopaque) Io.Cancelable!void,
context_alignment: Alignment,
alloc_len: usize,
@@ -1286,7 +1355,7 @@ const GroupClosure = struct {
current_thread.current_closure = closure;
current_thread.cancel_protection = .unblocked;
- gc.func(group, gc.contextPointer());
+ assertResult(closure, gc.func(group, gc.contextPointer()));
current_thread.current_closure = null;
current_thread.cancel_protection = undefined;
@@ -1296,6 +1365,16 @@ const GroupClosure = struct {
if (prev_state == (sync_one_pending | sync_is_waiting)) event.set(ioBasic(t));
}
+ fn assertResult(closure: *Closure, result: Io.Cancelable!void) void {
+ if (result) |_| switch (closure.cancel_status.unpack()) {
+ .none, .requested => {},
+ .acknowledged => unreachable, // task illegally swallowed error.Canceled
+ .signal_id => unreachable,
+ } else |err| switch (err) {
+ error.Canceled => assert(closure.cancel_status == .acknowledged),
+ }
+ }
+
fn contextPointer(gc: *GroupClosure) [*]u8 {
const base: [*]u8 = @ptrCast(gc);
const context_offset = gc.context_alignment.forward(@intFromPtr(gc) + @sizeOf(GroupClosure)) - @intFromPtr(gc);
@@ -1308,7 +1387,7 @@ const GroupClosure = struct {
group: *Io.Group,
context: []const u8,
context_alignment: Alignment,
- func: *const fn (*Io.Group, context: *const anyopaque) void,
+ func: *const fn (*Io.Group, context: *const anyopaque) Io.Cancelable!void,
) Allocator.Error!*GroupClosure {
const max_context_misalignment = context_alignment.toByteUnits() -| @alignOf(GroupClosure);
const worst_case_context_offset = context_alignment.forward(@sizeOf(GroupClosure) + max_context_misalignment);
@@ -1346,14 +1425,14 @@ fn groupAsync(
group: *Io.Group,
context: []const u8,
context_alignment: Alignment,
- start: *const fn (*Io.Group, context: *const anyopaque) void,
+ start: *const fn (*Io.Group, context: *const anyopaque) Io.Cancelable!void,
) void {
const t: *Threaded = @ptrCast(@alignCast(userdata));
- if (builtin.single_threaded) return start(group, context.ptr);
+ if (builtin.single_threaded) return start(group, context.ptr) catch unreachable;
const gpa = t.allocator;
const gc = GroupClosure.init(gpa, group, context, context_alignment, start) catch
- return start(group, context.ptr);
+ return t.assertGroupResult(start(group, context.ptr));
t.mutex.lock();
@@ -1362,7 +1441,7 @@ fn groupAsync(
if (busy_count >= @intFromEnum(t.async_limit)) {
t.mutex.unlock();
gc.deinit(gpa);
- return start(group, context.ptr);
+ return t.assertGroupResult(start(group, context.ptr));
}
t.busy_count = busy_count + 1;
@@ -1375,7 +1454,7 @@ fn groupAsync(
t.busy_count = busy_count;
t.mutex.unlock();
gc.deinit(gpa);
- return start(group, context.ptr);
+ return t.assertGroupResult(start(group, context.ptr));
};
thread.detach();
}
@@ -1396,12 +1475,18 @@ fn groupAsync(
t.cond.signal();
}
+fn assertGroupResult(t: *Threaded, result: Io.Cancelable!void) void {
+ const current_thread: *Thread = .getCurrent(t);
+ const current_closure = current_thread.current_closure orelse return;
+ GroupClosure.assertResult(current_closure, result);
+}
+
fn groupConcurrent(
userdata: ?*anyopaque,
group: *Io.Group,
context: []const u8,
context_alignment: Alignment,
- start: *const fn (*Io.Group, context: *const anyopaque) void,
+ start: *const fn (*Io.Group, context: *const anyopaque) Io.Cancelable!void,
) Io.ConcurrentError!void {
if (builtin.single_threaded) return error.ConcurrencyUnavailable;
@@ -1447,7 +1532,7 @@ fn groupConcurrent(
t.cond.signal();
}
-fn groupWait(userdata: ?*anyopaque, group: *Io.Group, initial_token: *anyopaque) void {
+fn groupAwait(userdata: ?*anyopaque, group: *Io.Group, initial_token: *anyopaque) Io.Cancelable!void {
const t: *Threaded = @ptrCast(@alignCast(userdata));
const gpa = t.allocator;
@@ -1459,16 +1544,14 @@ fn groupWait(userdata: ?*anyopaque, group: *Io.Group, initial_token: *anyopaque)
const event: *Io.Event = @ptrCast(&group.context);
const prev_state = group_state.fetchAdd(GroupClosure.sync_is_waiting, .acquire);
assert(prev_state & GroupClosure.sync_is_waiting == 0);
- if ((prev_state / GroupClosure.sync_one_pending) > 0) event.wait(ioBasic(t)) catch |err| switch (err) {
- error.Canceled => {
- var it: ?*std.SinglyLinkedList.Node = @ptrCast(@alignCast(group.token.load(.monotonic)));
- while (it) |node| : (it = node.next) {
- const gc: *GroupClosure = @fieldParentPtr("node", node);
- gc.closure.requestCancel(t);
- }
- event.waitUncancelable(ioBasic(t));
- },
- };
+ {
+ errdefer _ = group_state.fetchSub(GroupClosure.sync_is_waiting, .monotonic);
+ // This event.wait can return error.Canceled, in which case this logic does
+ // *not* propagate cancel requests to each group member. Instead, the user
+ // code will likely do this with a defered call to groupCancel, or,
+ // intentionally not do this.
+ if ((prev_state / GroupClosure.sync_one_pending) > 0) try event.wait(ioBasic(t));
+ }
// Since the group has now finished, it's illegal to add more tasks to it until we return. It's
// also illegal for us to race with another `await` or `cancel`. Therefore, we must be the only
@@ -10390,6 +10473,89 @@ fn netCloseUnavailable(userdata: ?*anyopaque, handles: []const net.Socket.Handle
unreachable; // How you gonna close something that was impossible to open?
}
+fn netShutdownPosix(userdata: ?*anyopaque, handle: net.Socket.Handle, how: net.ShutdownHow) net.ShutdownError!void {
+ if (!have_networking) return error.NetworkDown;
+ const t: *Threaded = @ptrCast(@alignCast(userdata));
+ const current_thread = Thread.getCurrent(t);
+
+ const posix_how: i32 = switch (how) {
+ .recv => posix.SHUT.RD,
+ .send => posix.SHUT.WR,
+ .both => posix.SHUT.RDWR,
+ };
+
+ try current_thread.beginSyscall();
+ while (true) {
+ switch (posix.errno(posix.system.shutdown(handle, posix_how))) {
+ .SUCCESS => {
+ current_thread.endSyscall();
+ return;
+ },
+ .INTR => {
+ try current_thread.checkCancel();
+ continue;
+ },
+ else => |e| {
+ current_thread.endSyscall();
+ switch (e) {
+ .BADF, .NOTSOCK, .INVAL => |err| return errnoBug(err),
+ .NOTCONN => return error.SocketUnconnected,
+ .NOBUFS => return error.SystemResources,
+ else => |err| return posix.unexpectedErrno(err),
+ }
+ },
+ }
+ }
+}
+
+fn netShutdownWindows(userdata: ?*anyopaque, handle: net.Socket.Handle, how: net.ShutdownHow) net.ShutdownError!void {
+ if (!have_networking) return error.NetworkDown;
+ const t: *Threaded = @ptrCast(@alignCast(userdata));
+ const current_thread = Thread.getCurrent(t);
+
+ const wsa_how: i32 = switch (how) {
+ .recv => ws2_32.SD_RECEIVE,
+ .send => ws2_32.SD_SEND,
+ .both => ws2_32.SD_BOTH,
+ };
+
+ try current_thread.beginSyscall();
+ while (true) {
+ const rc = ws2_32.shutdown(handle, wsa_how);
+ if (rc != ws2_32.SOCKET_ERROR) {
+ current_thread.endSyscall();
+ return;
+ }
+ switch (ws2_32.WSAGetLastError()) {
+ .EINTR => {
+ try current_thread.checkCancel();
+ continue;
+ },
+ .NOTINITIALISED => {
+ try initializeWsa(t);
+ try current_thread.checkCancel();
+ continue;
+ },
+ else => |e| {
+ current_thread.endSyscall();
+ switch (e) {
+ .ECANCELLED, .E_CANCELLED, .OPERATION_ABORTED => return error.Canceled,
+ .ECONNABORTED => return error.ConnectionAborted,
+ .ECONNRESET => return error.ConnectionResetByPeer,
+ .ENETDOWN => return error.NetworkDown,
+ .ENOTCONN => return error.SocketUnconnected,
+ .EINVAL, .ENOTSOCK => |err| return wsaErrorBug(err),
+ else => |err| return windows.unexpectedWSAError(err),
+ }
+ },
+ }
+ }
+}
+
+fn netShutdownUnavailable(_: ?*anyopaque, _: net.Socket.Handle, _: net.ShutdownHow) net.ShutdownError!void {
+ unreachable; // How you gonna shutdown something that was impossible to open?
+}
+
fn netInterfaceNameResolve(
userdata: ?*anyopaque,
name: *const net.Interface.Name,
@@ -10776,33 +10942,21 @@ fn netLookupFallible(
return error.OptionUnsupported;
}
-fn lockStderr(
- userdata: ?*anyopaque,
- buffer: []u8,
- terminal_mode: ?Io.Terminal.Mode,
-) Io.Cancelable!Io.LockedStderr {
+fn lockStderr(userdata: ?*anyopaque, terminal_mode: ?Io.Terminal.Mode) Io.Cancelable!Io.LockedStderr {
const t: *Threaded = @ptrCast(@alignCast(userdata));
// Only global mutex since this is Threaded.
std.process.stderr_thread_mutex.lock();
- return initLockedStderr(t, buffer, terminal_mode);
+ return initLockedStderr(t, terminal_mode);
}
-fn tryLockStderr(
- userdata: ?*anyopaque,
- buffer: []u8,
- terminal_mode: ?Io.Terminal.Mode,
-) Io.Cancelable!?Io.LockedStderr {
+fn tryLockStderr(userdata: ?*anyopaque, terminal_mode: ?Io.Terminal.Mode) Io.Cancelable!?Io.LockedStderr {
const t: *Threaded = @ptrCast(@alignCast(userdata));
// Only global mutex since this is Threaded.
if (!std.process.stderr_thread_mutex.tryLock()) return null;
- return try initLockedStderr(t, buffer, terminal_mode);
+ return try initLockedStderr(t, terminal_mode);
}
-fn initLockedStderr(
- t: *Threaded,
- buffer: []u8,
- terminal_mode: ?Io.Terminal.Mode,
-) Io.Cancelable!Io.LockedStderr {
+fn initLockedStderr(t: *Threaded, terminal_mode: ?Io.Terminal.Mode) Io.Cancelable!Io.LockedStderr {
if (!t.stderr_writer_initialized) {
const io_t = ioBasic(t);
if (is_windows) t.stderr_writer.file = .stderr();
@@ -10813,19 +10967,6 @@ fn initLockedStderr(
const CLICOLOR_FORCE = t.environ.exist.CLICOLOR_FORCE;
t.stderr_mode = terminal_mode orelse try .detect(io_t, t.stderr_writer.file, NO_COLOR, CLICOLOR_FORCE);
}
- std.Progress.clearWrittenWithEscapeCodes(&t.stderr_writer) catch |err| switch (err) {
- error.WriteFailed => switch (t.stderr_writer.err.?) {
- error.Canceled => |e| return e,
- else => {},
- },
- };
- t.stderr_writer.interface.flush() catch |err| switch (err) {
- error.WriteFailed => switch (t.stderr_writer.err.?) {
- error.Canceled => |e| return e,
- else => {},
- },
- };
- t.stderr_writer.interface.buffer = buffer;
return .{
.file_writer = &t.stderr_writer,
.terminal_mode = terminal_mode orelse t.stderr_mode,
diff --git a/lib/std/Io/Threaded/test.zig b/lib/std/Io/Threaded/test.zig
index 8169f6bb37..faf1a5cb64 100644
--- a/lib/std/Io/Threaded/test.zig
+++ b/lib/std/Io/Threaded/test.zig
@@ -124,7 +124,7 @@ test "Group.async context alignment" {
var group: std.Io.Group = .init;
var result: ByteArray512 = undefined;
group.async(io, concatByteArraysResultPtr, .{ a, b, &result });
- group.wait(io);
+ group.awaitUncancelable(io);
try std.testing.expectEqualSlices(u8, &expected.x, &result.x);
}
diff --git a/lib/std/Io/net.zig b/lib/std/Io/net.zig
index 8b1523fbd3..76a581180d 100644
--- a/lib/std/Io/net.zig
+++ b/lib/std/Io/net.zig
@@ -954,6 +954,16 @@ pub const SendFlags = packed struct(u8) {
_: u3 = 0,
};
+pub const ShutdownHow = enum { recv, send, both };
+
+pub const ShutdownError = error{
+ ConnectionAborted,
+ ConnectionResetByPeer,
+ NetworkDown,
+ SocketUnconnected,
+ SystemResources,
+} || Io.UnexpectedError || Io.Cancelable;
+
pub const Interface = struct {
/// Value 0 indicates `none`.
index: u32,
@@ -1191,6 +1201,10 @@ pub const Stream = struct {
io.vtable.netClose(io.userdata, (&s.socket.handle)[0..1]);
}
+ pub fn shutdown(s: *const Stream, io: Io, how: ShutdownHow) ShutdownError!void {
+ return io.vtable.netShutdown(io.userdata, s.socket.handle, how);
+ }
+
pub const Reader = struct {
io: Io,
interface: Io.Reader,
diff --git a/lib/std/Io/net/HostName.zig b/lib/std/Io/net/HostName.zig
index 84484b9dc1..03f92cc022 100644
--- a/lib/std/Io/net/HostName.zig
+++ b/lib/std/Io/net/HostName.zig
@@ -289,7 +289,7 @@ pub fn connectMany(
} else |err| switch (err) {
error.Canceled => |e| return e,
error.Closed => {
- group.wait(io);
+ try group.await(io);
return lookup_future.await(io);
},
}
diff --git a/lib/std/Io/net/test.zig b/lib/std/Io/net/test.zig
index 6ef8c15f4f..45c26f9540 100644
--- a/lib/std/Io/net/test.zig
+++ b/lib/std/Io/net/test.zig
@@ -346,6 +346,8 @@ test "non-blocking tcp server" {
const len = try socket_file.read(&buf);
const msg = buf[0..len];
try testing.expect(mem.eql(u8, msg, "hello from server\n"));
+
+ try stream.shutdown(io, .both);
}
test "decompress compressed DNS name" {
diff --git a/lib/std/Io/test.zig b/lib/std/Io/test.zig
index a317c822cb..582ad06615 100644
--- a/lib/std/Io/test.zig
+++ b/lib/std/Io/test.zig
@@ -194,7 +194,7 @@ test "Group" {
group.async(io, count, .{ 1, 10, &results[0] });
group.async(io, count, .{ 20, 30, &results[1] });
- group.wait(io);
+ group.awaitUncancelable(io);
try testing.expectEqualSlices(usize, &.{ 45, 245 }, &results);
}
@@ -207,27 +207,48 @@ fn count(a: usize, b: usize, result: *usize) void {
result.* = sum;
}
-test "Group cancellation" {
+test "Group cancelation" {
const io = testing.io;
var group: Io.Group = .init;
- var results: [2]usize = undefined;
+ var results: [4]usize = .{ 0, 0, 0, 0 };
+ // TODO when robust cancelation is available, make the sleep timeouts much
+ // longer so that it causes the unit test to be failed if not canceled.
+ // https://codeberg.org/ziglang/zig/issues/30049
group.async(io, sleep, .{ io, &results[0] });
group.async(io, sleep, .{ io, &results[1] });
+ group.async(io, sleepUncancelable, .{ io, &results[2] });
+ group.async(io, sleepRecancel, .{ io, &results[3] });
group.cancel(io);
- try testing.expectEqualSlices(usize, &.{ 1, 1 }, &results);
+ try testing.expectEqualSlices(usize, &.{ 1, 1, 1, 1 }, &results);
+}
+
+fn sleep(io: Io, result: *usize) error{Canceled}!void {
+ defer result.* = 1;
+ io.sleep(.fromMilliseconds(1), .awake) catch |err| switch (err) {
+ error.Canceled => |e| return e,
+ else => {},
+ };
}
-fn sleep(io: Io, result: *usize) void {
- // TODO when cancellation race bug is fixed, make this timeout much longer so that
- // it causes the unit test to be failed if not canceled.
+fn sleepUncancelable(io: Io, result: *usize) void {
+ const old_prot = io.swapCancelProtection(.blocked);
+ defer _ = io.swapCancelProtection(old_prot);
io.sleep(.fromMilliseconds(1), .awake) catch {};
result.* = 1;
}
+fn sleepRecancel(io: Io, result: *usize) void {
+ io.sleep(.fromMilliseconds(1), .awake) catch |err| switch (err) {
+ error.Canceled => io.recancel(),
+ else => {},
+ };
+ result.* = 1;
+}
+
test "Group concurrent" {
const io = testing.io;
@@ -249,7 +270,7 @@ test "Group concurrent" {
},
};
- group.wait(io);
+ try group.await(io);
try testing.expectEqualSlices(usize, &.{ 45, 245 }, &results);
}
diff --git a/lib/std/Progress.zig b/lib/std/Progress.zig
index 6baa24d246..f0baca2784 100644
--- a/lib/std/Progress.zig
+++ b/lib/std/Progress.zig
@@ -525,8 +525,8 @@ pub fn start(io: Io, options: Options) Node {
if (switch (global_progress.terminal_mode) {
.off => unreachable, // handled a few lines above
- .ansi_escape_codes => io.concurrent(updateThreadRun, .{io}),
- .windows_api => if (is_windows) io.concurrent(windowsApiUpdateThreadRun, .{io}) else unreachable,
+ .ansi_escape_codes => io.concurrent(updateTask, .{io}),
+ .windows_api => if (is_windows) io.concurrent(windowsApiUpdateTask, .{io}) else unreachable,
}) |future| {
global_progress.update_worker = future;
} else |err| {
@@ -561,18 +561,23 @@ fn wait(io: Io, timeout_ns: u64) bool {
return resize_flag or (global_progress.cols == 0);
}
-fn updateThreadRun(io: Io) void {
+fn updateTask(io: Io) void {
// Store this data in the thread so that it does not need to be part of the
// linker data of the main executable.
var serialized_buffer: Serialized.Buffer = undefined;
+ // In this function we bypass the wrapper code inside `Io.lockStderr` /
+ // `Io.tryLockStderr` in order to avoid clearing the terminal twice.
+ // We still want to go through the `Io` instance however in case it uses a
+ // task-switching mutex.
+
{
const resize_flag = wait(io, global_progress.initial_delay_ns);
if (@atomicLoad(bool, &global_progress.done, .monotonic)) return;
maybeUpdateSize(resize_flag);
const buffer, _ = computeRedraw(&serialized_buffer);
- if (io.tryLockStderr(&.{}, null) catch return) |locked_stderr| {
+ if (io.vtable.tryLockStderr(io.userdata, null) catch return) |locked_stderr| {
defer io.unlockStderr();
global_progress.need_clear = true;
locked_stderr.file_writer.interface.writeAll(buffer) catch return;
@@ -583,7 +588,7 @@ fn updateThreadRun(io: Io) void {
const resize_flag = wait(io, global_progress.refresh_rate_ns);
if (@atomicLoad(bool, &global_progress.done, .monotonic)) {
- const stderr = io.lockStderr(&.{}, null) catch return;
+ const stderr = io.vtable.lockStderr(io.userdata, null) catch return;
defer io.unlockStderr();
return clearWrittenWithEscapeCodes(stderr.file_writer) catch {};
}
@@ -591,7 +596,7 @@ fn updateThreadRun(io: Io) void {
maybeUpdateSize(resize_flag);
const buffer, _ = computeRedraw(&serialized_buffer);
- if (io.tryLockStderr(&.{}, null) catch return) |locked_stderr| {
+ if (io.vtable.tryLockStderr(io.userdata, null) catch return) |locked_stderr| {
defer io.unlockStderr();
global_progress.need_clear = true;
locked_stderr.file_writer.interface.writeAll(buffer) catch return;
@@ -607,16 +612,21 @@ fn windowsApiWriteMarker() void {
_ = windows.kernel32.WriteConsoleW(handle, &[_]u16{windows_api_start_marker}, 1, &num_chars_written, null);
}
-fn windowsApiUpdateThreadRun(io: Io) void {
+fn windowsApiUpdateTask(io: Io) void {
var serialized_buffer: Serialized.Buffer = undefined;
+ // In this function we bypass the wrapper code inside `Io.lockStderr` /
+ // `Io.tryLockStderr` in order to avoid clearing the terminal twice.
+ // We still want to go through the `Io` instance however in case it uses a
+ // task-switching mutex.
+
{
const resize_flag = wait(io, global_progress.initial_delay_ns);
if (@atomicLoad(bool, &global_progress.done, .monotonic)) return;
maybeUpdateSize(resize_flag);
const buffer, const nl_n = computeRedraw(&serialized_buffer);
- if (io.tryLockStderr(&.{}, null) catch return) |locked_stderr| {
+ if (io.vtable.tryLockStderr(io.userdata, null) catch return) |locked_stderr| {
defer io.unlockStderr();
windowsApiWriteMarker();
global_progress.need_clear = true;
@@ -629,7 +639,7 @@ fn windowsApiUpdateThreadRun(io: Io) void {
const resize_flag = wait(io, global_progress.refresh_rate_ns);
if (@atomicLoad(bool, &global_progress.done, .monotonic)) {
- _ = io.lockStderr(&.{}, null) catch return;
+ _ = io.vtable.lockStderr(io.userdata, null) catch return;
defer io.unlockStderr();
return clearWrittenWindowsApi() catch {};
}
@@ -637,7 +647,7 @@ fn windowsApiUpdateThreadRun(io: Io) void {
maybeUpdateSize(resize_flag);
const buffer, const nl_n = computeRedraw(&serialized_buffer);
- if (io.tryLockStderr(&.{}, null) catch return) |locked_stderr| {
+ if (io.vtable.tryLockStderr(io.userdata, null) catch return) |locked_stderr| {
defer io.unlockStderr();
clearWrittenWindowsApi() catch return;
windowsApiWriteMarker();
diff --git a/lib/std/c.zig b/lib/std/c.zig
index 5f0d2dedf8..e9810aeb09 100644
--- a/lib/std/c.zig
+++ b/lib/std/c.zig
@@ -167,6 +167,18 @@ pub const timespec = switch (native_os) {
.openbsd, .haiku => extern struct {
sec: time_t,
nsec: isize,
+
+ /// For use with `utimensat` and `futimens`.
+ pub const NOW: timespec = .{
+ .sec = 0, // ignored
+ .nsec = -2,
+ };
+
+ /// For use with `utimensat` and `futimens`.
+ pub const OMIT: timespec = .{
+ .sec = 0, // ignored
+ .nsec = -1,
+ };
},
else => void,
};
@@ -2365,6 +2377,8 @@ pub const S = switch (native_os) {
pub const IWOTH = 0o002;
pub const IXOTH = 0o001;
+ pub const BLKSIZE = 512;
+
pub fn ISFIFO(m: u32) bool {
return m & IFMT == IFIFO;
}
@@ -9676,6 +9690,7 @@ pub const NSIG = switch (native_os) {
.illumos => 75,
// https://github.com/SerenityOS/serenity/blob/046c23f567a17758d762a33bdf04bacbfd088f9f/Kernel/API/POSIX/signal_numbers.h#L42
.openbsd, .serenity => 33,
+ .dragonfly => 64,
else => {},
};
diff --git a/lib/std/crypto.zig b/lib/std/crypto.zig
index a942d6538f..bbcba6a02b 100644
--- a/lib/std/crypto.zig
+++ b/lib/std/crypto.zig
@@ -184,7 +184,7 @@ pub const pwhash = struct {
pub const Error = HasherError || error{AllocatorRequired};
pub const HasherError = KdfError || phc_format.Error;
- pub const KdfError = errors.Error || std.mem.Allocator.Error || std.Thread.SpawnError;
+ pub const KdfError = errors.Error || std.mem.Allocator.Error || std.Thread.SpawnError || std.Io.Cancelable;
pub const argon2 = @import("crypto/argon2.zig");
pub const bcrypt = @import("crypto/bcrypt.zig");
diff --git a/lib/std/crypto/argon2.zig b/lib/std/crypto/argon2.zig
index 42165ca524..aae8400679 100644
--- a/lib/std/crypto/argon2.zig
+++ b/lib/std/crypto/argon2.zig
@@ -2,9 +2,9 @@
// https://github.com/golang/crypto/tree/master/argon2
// https://github.com/P-H-C/phc-winner-argon2
-const std = @import("std");
const builtin = @import("builtin");
+const std = @import("std");
const blake2 = crypto.hash.blake2;
const crypto = std.crypto;
const Io = std.Io;
@@ -53,23 +53,24 @@ pub const Mode = enum {
pub const Params = struct {
const Self = @This();
- /// A [t]ime cost, which defines the amount of computation realized and therefore the execution
+ /// Time cost, which defines the amount of computation realized and therefore the execution
/// time, given in number of iterations.
t: u32,
- /// A [m]emory cost, which defines the memory usage, given in kibibytes.
+ /// Memory cost, which defines the memory usage, given in kibibytes.
m: u32,
- /// A [p]arallelism degree, which defines the number of parallel threads.
+ /// Parallelism degree, which defines the number of independent tasks,
+ /// to be multiplexed onto threads when possible.
p: u24,
- /// The [secret] parameter, which is used for keyed hashing. This allows a secret key to be input
+ /// The secret parameter, which is used for keyed hashing. This allows a secret key to be input
/// at hashing time (from some external location) and be folded into the value of the hash. This
/// means that even if your salts and hashes are compromised, an attacker cannot brute-force to
/// find the password without the key.
secret: ?[]const u8 = null,
- /// The [ad] parameter, which is used to fold any additional data into the hash value. Functionally,
+ /// The ad parameter, which is used to fold any additional data into the hash value. Functionally,
/// this behaves almost exactly like the secret or salt parameters; the ad parameter is folding
/// into the value of the hash. However, this parameter is used for different data. The salt
/// should be a random string stored alongside your password. The secret should be a random key
@@ -209,18 +210,18 @@ fn processBlocks(
threads: u24,
mode: Mode,
io: Io,
-) void {
+) Io.Cancelable!void {
const lanes = memory / threads;
const segments = lanes / sync_points;
if (builtin.single_threaded or threads == 1) {
- processBlocksSt(blocks, time, memory, threads, mode, lanes, segments);
+ processBlocksSync(blocks, time, memory, threads, mode, lanes, segments);
} else {
- processBlocksMt(blocks, time, memory, threads, mode, lanes, segments, io);
+ try processBlocksAsync(blocks, time, memory, threads, mode, lanes, segments, io);
}
}
-fn processBlocksSt(
+fn processBlocksSync(
blocks: *Blocks,
time: u32,
memory: u32,
@@ -241,7 +242,7 @@ fn processBlocksSt(
}
}
-fn processBlocksMt(
+fn processBlocksAsync(
blocks: *Blocks,
time: u32,
memory: u32,
@@ -250,19 +251,20 @@ fn processBlocksMt(
lanes: u32,
segments: u32,
io: Io,
-) void {
+) Io.Cancelable!void {
var n: u32 = 0;
while (n < time) : (n += 1) {
var slice: u32 = 0;
while (slice < sync_points) : (slice += 1) {
var group: Io.Group = .init;
+ defer group.cancel(io);
var lane: u24 = 0;
while (lane < threads) : (lane += 1) {
group.async(io, processSegment, .{
blocks, time, memory, threads, mode, lanes, segments, n, slice, lane,
});
}
- group.wait(io);
+ try group.await(io);
}
}
}
@@ -503,7 +505,7 @@ pub fn kdf(
blocks.appendNTimesAssumeCapacity(@splat(0), memory);
initBlocks(&blocks, &h0, memory, params.p);
- processBlocks(&blocks, params.t, memory, params.p, mode, io);
+ try processBlocks(&blocks, params.t, memory, params.p, mode, io);
finalize(&blocks, memory, params.p, derived_key);
}
diff --git a/lib/std/crypto/blake3.zig b/lib/std/crypto/blake3.zig
index 53b28c24a1..1a1afab3ee 100644
--- a/lib/std/crypto/blake3.zig
+++ b/lib/std/crypto/blake3.zig
@@ -1,9 +1,11 @@
-const std = @import("std");
const builtin = @import("builtin");
+
+const std = @import("std");
const fmt = std.fmt;
const mem = std.mem;
const Io = std.Io;
const Thread = std.Thread;
+const Allocator = std.mem.Allocator;
const Vec4 = @Vector(4, u32);
const Vec8 = @Vector(8, u32);
@@ -767,7 +769,7 @@ fn buildMerkleTreeLayerParallel(
key: [8]u32,
flags: Flags,
io: Io,
-) void {
+) Io.Cancelable!void {
const num_parents = input_cvs.len / 2;
// Process sequentially with SIMD for smaller tree layers to avoid thread overhead
@@ -787,6 +789,7 @@ fn buildMerkleTreeLayerParallel(
const num_workers = Thread.getCpuCount() catch 1;
const parents_per_worker = (num_parents + num_workers - 1) / num_workers;
var group: Io.Group = .init;
+ defer group.cancel(io);
for (0..num_workers) |worker_id| {
const start_idx = worker_id * parents_per_worker;
@@ -801,7 +804,7 @@ fn buildMerkleTreeLayerParallel(
.flags = flags,
}});
}
- group.wait(io);
+ try group.await(io);
}
fn parentOutput(parent_block: []const u8, key: [8]u32, flags: Flags) Output {
@@ -987,7 +990,7 @@ pub const Blake3 = struct {
d.final(out);
}
- pub fn hashParallel(b: []const u8, out: []u8, options: Options, allocator: std.mem.Allocator, io: Io) !void {
+ pub fn hashParallel(b: []const u8, out: []u8, options: Options, allocator: Allocator, io: Io) error{ OutOfMemory, Canceled }!void {
if (b.len < parallel_threshold) {
return hash(b, out, options);
}
@@ -1008,6 +1011,7 @@ pub const Blake3 = struct {
const num_workers = thread_count;
const chunks_per_worker = (num_full_chunks + num_workers - 1) / num_workers;
var group: Io.Group = .init;
+ defer group.cancel(io);
for (0..num_workers) |worker_id| {
const start_chunk = worker_id * chunks_per_worker;
@@ -1022,7 +1026,7 @@ pub const Blake3 = struct {
.flags = flags,
}});
}
- group.wait(io);
+ try group.await(io);
// Build Merkle tree in parallel layers using ping-pong buffers
const max_intermediate_size = (num_full_chunks + 1) / 2;
@@ -1040,7 +1044,7 @@ pub const Blake3 = struct {
const has_odd = current_level.len % 2 == 1;
const next_level_size = num_parents + @intFromBool(has_odd);
- buildMerkleTreeLayerParallel(
+ try buildMerkleTreeLayerParallel(
current_level[0 .. num_parents * 2],
next_level_buf[0..num_parents],
key_words,
diff --git a/lib/std/crypto/kangarootwelve.zig b/lib/std/crypto/kangarootwelve.zig
index 08adb3ba0b..944381ec4b 100644
--- a/lib/std/crypto/kangarootwelve.zig
+++ b/lib/std/crypto/kangarootwelve.zig
@@ -1,9 +1,10 @@
-const std = @import("std");
const builtin = @import("builtin");
+
+const std = @import("std");
const crypto = std.crypto;
const Allocator = std.mem.Allocator;
const Io = std.Io;
-const Thread = std.Thread;
+const assert = std.debug.assert;
const TurboSHAKE128State = crypto.hash.sha3.TurboShake128(0x06);
const TurboSHAKE256State = crypto.hash.sha3.TurboShake256(0x06);
@@ -598,7 +599,7 @@ inline fn processNLeaves(
output: []align(@alignOf(u64)) u8,
) void {
const cv_size = Variant.cv_size;
- comptime std.debug.assert(cv_size % @sizeOf(u64) == 0);
+ comptime assert(cv_size % @sizeOf(u64) == 0);
if (view.tryGetSlice(j, j + N * chunk_size)) |leaf_data| {
var leaf_cvs: [N * cv_size]u8 = undefined;
@@ -645,7 +646,7 @@ fn processLeafBatch(comptime Variant: type, ctx: LeafBatchContext) void {
j += chunk_len;
}
- std.debug.assert(cvs_offset == ctx.output_cvs.len);
+ assert(cvs_offset == ctx.output_cvs.len);
}
/// Helper to process N leaves in SIMD and absorb CVs into state
@@ -841,7 +842,7 @@ fn ktMultiThreaded(
total_len: usize,
output: []u8,
) !void {
- comptime std.debug.assert(bytes_per_batch % (optimal_vector_len * chunk_size) == 0);
+ comptime assert(bytes_per_batch % (optimal_vector_len * chunk_size) == 0);
const cv_size = Variant.cv_size;
const StateType = Variant.StateType;
@@ -883,6 +884,7 @@ fn ktMultiThreaded(
var pending_cv_lens: [256]usize = .{0} ** 256;
var select: Select = .init(io, select_buf);
+ defer select.cancel();
var batches_spawned: usize = 0;
var next_to_process: usize = 0;
@@ -901,7 +903,7 @@ fn ktMultiThreaded(
batches_spawned += 1;
}
- const result = select.wait() catch unreachable;
+ const result = try select.await();
const batch = result.batch;
const slot = batch.batch_idx % max_concurrent;
@@ -925,7 +927,7 @@ fn ktMultiThreaded(
}
}
- select.group.wait(io);
+ assert(select.outstanding == 0);
}
if (has_partial_leaf) {
diff --git a/lib/std/fs.zig b/lib/std/fs.zig
index 5f2d36323a..4d149dbbda 100644
--- a/lib/std/fs.zig
+++ b/lib/std/fs.zig
@@ -6,9 +6,6 @@ const std = @import("std.zig");
pub const path = @import("fs/path.zig");
pub const wasi = @import("fs/wasi.zig");
-pub const getAppDataDir = @import("fs/get_app_data_dir.zig").getAppDataDir;
-pub const GetAppDataDirError = @import("fs/get_app_data_dir.zig").GetAppDataDirError;
-
pub const base64_alphabet = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_".*;
/// Base64 encoder, replacing the standard `+/` with `-_` so that it can be used in a file name on any filesystem.
@@ -25,5 +22,4 @@ pub const max_name_bytes = std.Io.Dir.max_name_bytes;
test {
_ = path;
_ = @import("fs/test.zig");
- _ = @import("fs/get_app_data_dir.zig");
}
diff --git a/lib/std/fs/get_app_data_dir.zig b/lib/std/fs/get_app_data_dir.zig
deleted file mode 100644
index 24741206cc..0000000000
--- a/lib/std/fs/get_app_data_dir.zig
+++ /dev/null
@@ -1,66 +0,0 @@
-const std = @import("../std.zig");
-const builtin = @import("builtin");
-const unicode = std.unicode;
-const mem = std.mem;
-const fs = std.fs;
-const native_os = builtin.os.tag;
-const posix = std.posix;
-
-pub const GetAppDataDirError = error{
- OutOfMemory,
- AppDataDirUnavailable,
-};
-
-/// Caller owns returned memory.
-/// TODO determine if we can remove the allocator requirement
-pub fn getAppDataDir(allocator: mem.Allocator, appname: []const u8) GetAppDataDirError![]u8 {
- switch (native_os) {
- .windows => {
- const local_app_data_dir = std.process.getEnvVarOwned(allocator, "LOCALAPPDATA") catch |err| switch (err) {
- error.OutOfMemory => |e| return e,
- else => return error.AppDataDirUnavailable,
- };
- defer allocator.free(local_app_data_dir);
- return fs.path.join(allocator, &[_][]const u8{ local_app_data_dir, appname });
- },
- .maccatalyst, .macos => {
- const home_dir = posix.getenv("HOME") orelse {
- // TODO look in /etc/passwd
- return error.AppDataDirUnavailable;
- };
- return fs.path.join(allocator, &[_][]const u8{ home_dir, "Library", "Application Support", appname });
- },
- .linux, .freebsd, .netbsd, .dragonfly, .openbsd, .illumos, .serenity => {
- if (posix.getenv("XDG_DATA_HOME")) |xdg| {
- if (xdg.len > 0) {
- return fs.path.join(allocator, &[_][]const u8{ xdg, appname });
- }
- }
-
- const home_dir = posix.getenv("HOME") orelse {
- // TODO look in /etc/passwd
- return error.AppDataDirUnavailable;
- };
- return fs.path.join(allocator, &[_][]const u8{ home_dir, ".local", "share", appname });
- },
- .haiku => {
- var dir_path_buf: [std.fs.max_path_bytes]u8 = undefined;
- const rc = std.c.find_directory(.B_USER_SETTINGS_DIRECTORY, -1, true, &dir_path_buf, dir_path_buf.len);
- const settings_dir = try allocator.dupeZ(u8, mem.sliceTo(&dir_path_buf, 0));
- defer allocator.free(settings_dir);
- switch (rc) {
- 0 => return fs.path.join(allocator, &[_][]const u8{ settings_dir, appname }),
- else => return error.AppDataDirUnavailable,
- }
- },
- else => @compileError("Unsupported OS"),
- }
-}
-
-test getAppDataDir {
- if (native_os == .wasi) return error.SkipZigTest;
-
- // We can't actually validate the result
- const dir = getAppDataDir(std.testing.allocator, "zig") catch return;
- defer std.testing.allocator.free(dir);
-}
diff --git a/lib/std/fs/test.zig b/lib/std/fs/test.zig
index bcb9048e0e..94cf3055e5 100644
--- a/lib/std/fs/test.zig
+++ b/lib/std/fs/test.zig
@@ -645,6 +645,7 @@ fn contains(entries: *const std.array_list.Managed(Dir.Entry), el: Dir.Entry) bo
test "Dir.realPath smoke test" {
if (native_os == .wasi) return error.SkipZigTest;
+ if (native_os == .openbsd) return error.SkipZigTest;
try testWithAllSupportedPathTypes(struct {
fn impl(ctx: *TestContext) !void {
@@ -820,7 +821,7 @@ test "file operations on directories" {
try expectError(error.IsDir, ctx.dir.createFile(io, test_dir_name, .{}));
try expectError(error.IsDir, ctx.dir.deleteFile(io, test_dir_name));
switch (native_os) {
- .dragonfly, .netbsd => {
+ .netbsd => {
// no error when reading a directory. See https://github.com/ziglang/zig/issues/5732
const buf = try ctx.dir.readFileAlloc(io, test_dir_name, testing.allocator, .unlimited);
testing.allocator.free(buf);
@@ -1240,6 +1241,7 @@ test "createDirPath, put some files in it, deleteTreeMinStackSize" {
test "createDirPath in a directory that no longer exists" {
if (native_os == .windows) return error.SkipZigTest; // Windows returns FileBusy if attempting to remove an open dir
+ if (native_os == .dragonfly) return error.SkipZigTest; // DragonflyBSD does not produce error (hammer2 fs)
const io = testing.io;
diff --git a/lib/std/mem.zig b/lib/std/mem.zig
index 76ee5f6a80..6612c6d618 100644
--- a/lib/std/mem.zig
+++ b/lib/std/mem.zig
@@ -2252,16 +2252,20 @@ test writeVarPackedInt {
/// Swap the byte order of all the members of the fields of a struct
/// (Changing their endianness)
pub fn byteSwapAllFields(comptime S: type, ptr: *S) void {
+ byteSwapAllFieldsAligned(S, @alignOf(S), ptr);
+}
+
+/// Swap the byte order of all the members of the fields of a struct
+/// (Changing their endianness)
+pub fn byteSwapAllFieldsAligned(comptime S: type, comptime A: comptime_int, ptr: *align(A) S) void {
switch (@typeInfo(S)) {
- .@"struct" => {
- inline for (std.meta.fields(S)) |f| {
+ .@"struct" => |struct_info| {
+ if (struct_info.backing_integer) |Int| {
+ ptr.* = @bitCast(@byteSwap(@as(Int, @bitCast(ptr.*))));
+ } else inline for (std.meta.fields(S)) |f| {
switch (@typeInfo(f.type)) {
- .@"struct" => |struct_info| if (struct_info.backing_integer) |Int| {
- @field(ptr, f.name) = @bitCast(@byteSwap(@as(Int, @bitCast(@field(ptr, f.name)))));
- } else {
- byteSwapAllFields(f.type, &@field(ptr, f.name));
- },
- .@"union", .array => byteSwapAllFields(f.type, &@field(ptr, f.name)),
+ .@"struct" => byteSwapAllFieldsAligned(f.type, f.alignment, &@field(ptr, f.name)),
+ .@"union", .array => byteSwapAllFieldsAligned(f.type, f.alignment, &@field(ptr, f.name)),
.@"enum" => {
@field(ptr, f.name) = @enumFromInt(@byteSwap(@intFromEnum(@field(ptr, f.name))));
},
@@ -2317,6 +2321,20 @@ test byteSwapAllFields {
f4: bool,
f5: f32,
};
+ const P = packed struct(u32) {
+ f0: u1,
+ f1: u7,
+ f2: u4,
+ f3: u4,
+ f4: u16,
+ };
+ const A = extern struct {
+ f0: u32,
+ f1: extern struct {
+ f0: u64,
+ } align(4),
+ f2: u32,
+ };
var s = T{
.f0 = 0x12,
.f1 = 0x1234,
@@ -2334,8 +2352,16 @@ test byteSwapAllFields {
.f4 = false,
.f5 = @as(f32, @bitCast(@as(u32, 0x45d42800))),
};
+ var p: P = @bitCast(@as(u32, 0x01234567));
+ var a: A = A{
+ .f0 = 0x12345678,
+ .f1 = .{ .f0 = 0x123456789ABCDEF0 },
+ .f2 = 0x87654321,
+ };
byteSwapAllFields(T, &s);
byteSwapAllFields(K, &k);
+ byteSwapAllFields(P, &p);
+ byteSwapAllFields(A, &a);
try std.testing.expectEqual(T{
.f0 = 0x12,
.f1 = 0x3412,
@@ -2353,6 +2379,12 @@ test byteSwapAllFields {
.f4 = false,
.f5 = @as(f32, @bitCast(@as(u32, 0x0028d445))),
}, k);
+ try std.testing.expectEqual(@as(P, @bitCast(@as(u32, 0x67452301))), p);
+ try std.testing.expectEqual(A{
+ .f0 = 0x78563412,
+ .f1 = .{ .f0 = 0xF0DEBC9A78563412 },
+ .f2 = 0x21436587,
+ }, a);
}
/// Reverses the byte order of all elements in a slice.
diff --git a/lib/std/posix.zig b/lib/std/posix.zig
index 52bc5f83e8..e56b9c095c 100644
--- a/lib/std/posix.zig
+++ b/lib/std/posix.zig
@@ -418,12 +418,42 @@ fn getRandomBytesDevURandom(buf: []u8) GetRandomError!void {
const fd = try openZ("/dev/urandom", .{ .ACCMODE = .RDONLY, .CLOEXEC = true }, 0);
defer close(fd);
- const st = fstat(fd) catch |err| switch (err) {
- error.Streaming => return error.NoDevice,
- else => |e| return e,
- };
- if (!S.ISCHR(st.mode)) {
- return error.NoDevice;
+ switch (native_os) {
+ .linux => {
+ var stx = std.mem.zeroes(linux.Statx);
+ const rc = linux.statx(
+ fd,
+ "",
+ linux.AT.EMPTY_PATH,
+ .{ .TYPE = true },
+ &stx,
+ );
+ switch (errno(rc)) {
+ .SUCCESS => {},
+ .ACCES => unreachable,
+ .BADF => unreachable,
+ .FAULT => unreachable,
+ .INVAL => unreachable,
+ .LOOP => unreachable,
+ .NAMETOOLONG => unreachable,
+ .NOENT => unreachable,
+ .NOMEM => return error.SystemResources,
+ .NOTDIR => unreachable,
+ else => |err| return unexpectedErrno(err),
+ }
+ if (!S.ISCHR(stx.mode)) {
+ return error.NoDevice;
+ }
+ },
+ else => {
+ const st = fstat(fd) catch |err| switch (err) {
+ error.Streaming => return error.NoDevice,
+ else => |e| return e,
+ };
+ if (!S.ISCHR(st.mode)) {
+ return error.NoDevice;
+ }
+ },
}
var i: usize = 0;
diff --git a/lib/std/posix/test.zig b/lib/std/posix/test.zig
index 64845d15ee..6fdb980b1b 100644
--- a/lib/std/posix/test.zig
+++ b/lib/std/posix/test.zig
@@ -364,9 +364,10 @@ test "getrlimit and setrlimit" {
}
test "sigrtmin/max" {
- if (native_os == .wasi or native_os == .windows or native_os.isDarwin() or native_os == .openbsd) {
- return error.SkipZigTest;
- }
+ if (native_os.isDarwin() or switch (native_os) {
+ .wasi, .windows, .openbsd, .dragonfly => true,
+ else => false,
+ }) return error.SkipZigTest;
try expect(posix.sigrtmin() >= 32);
try expect(posix.sigrtmin() >= posix.system.sigrtmin());
@@ -397,7 +398,7 @@ fn reserved_signo(i: usize) bool {
if (!builtin.link_libc) return false;
const max = if (native_os == .netbsd) 32 else 31;
if (i > max) return true;
- if (native_os == .openbsd) return false; // no RT signals
+ if (native_os == .openbsd or native_os == .dragonfly) return false; // no RT signals
return i < posix.sigrtmin();
}
diff --git a/lib/std/zig.zig b/lib/std/zig.zig
index 6212264005..abc213ba27 100644
--- a/lib/std/zig.zig
+++ b/lib/std/zig.zig
@@ -743,6 +743,7 @@ pub const EnvVar = enum {
NO_COLOR,
CLICOLOR_FORCE,
XDG_CACHE_HOME,
+ LOCALAPPDATA,
HOME,
pub fn isSet(comptime ev: EnvVar) bool {
diff --git a/lib/std/zig/WindowsSdk.zig b/lib/std/zig/WindowsSdk.zig
index b0f24c2aca..1b172e4358 100644
--- a/lib/std/zig/WindowsSdk.zig
+++ b/lib/std/zig/WindowsSdk.zig
@@ -860,7 +860,14 @@ const MsvcLibDir = struct {
// %localappdata%\Microsoft\VisualStudio\
// %appdata%\Local\Microsoft\VisualStudio\
- const visualstudio_folder_path = std.fs.getAppDataDir(gpa, "Microsoft\\VisualStudio\\") catch return error.PathNotFound;
+ const local_app_data_path = (std.zig.EnvVar.LOCALAPPDATA.get(gpa) catch |err| switch (err) {
+ error.OutOfMemory => |e| return e,
+ error.InvalidWtf8 => return error.PathNotFound,
+ }) orelse return error.PathNotFound;
+ defer gpa.free(local_app_data_path);
+ const visualstudio_folder_path = try Dir.path.join(gpa, &.{
+ local_app_data_path, "Microsoft\\VisualStudio\\",
+ });
defer gpa.free(visualstudio_folder_path);
const vs_versions: []const []const u8 = vs_versions: {