diff options
Diffstat (limited to 'lib/std')
| -rw-r--r-- | lib/std/Build/Fuzz.zig | 6 | ||||
| -rw-r--r-- | lib/std/Io.zig | 87 | ||||
| -rw-r--r-- | lib/std/Io/Dir.zig | 5 | ||||
| -rw-r--r-- | lib/std/Io/Kqueue.zig | 13 | ||||
| -rw-r--r-- | lib/std/Io/Threaded.zig | 245 | ||||
| -rw-r--r-- | lib/std/Io/Threaded/test.zig | 2 | ||||
| -rw-r--r-- | lib/std/Io/net.zig | 14 | ||||
| -rw-r--r-- | lib/std/Io/net/HostName.zig | 2 | ||||
| -rw-r--r-- | lib/std/Io/net/test.zig | 2 | ||||
| -rw-r--r-- | lib/std/Io/test.zig | 37 | ||||
| -rw-r--r-- | lib/std/Progress.zig | 30 | ||||
| -rw-r--r-- | lib/std/c.zig | 15 | ||||
| -rw-r--r-- | lib/std/crypto.zig | 2 | ||||
| -rw-r--r-- | lib/std/crypto/argon2.zig | 30 | ||||
| -rw-r--r-- | lib/std/crypto/blake3.zig | 16 | ||||
| -rw-r--r-- | lib/std/crypto/kangarootwelve.zig | 16 | ||||
| -rw-r--r-- | lib/std/fs.zig | 4 | ||||
| -rw-r--r-- | lib/std/fs/get_app_data_dir.zig | 66 | ||||
| -rw-r--r-- | lib/std/fs/test.zig | 4 | ||||
| -rw-r--r-- | lib/std/mem.zig | 48 | ||||
| -rw-r--r-- | lib/std/posix.zig | 42 | ||||
| -rw-r--r-- | lib/std/posix/test.zig | 9 | ||||
| -rw-r--r-- | lib/std/zig.zig | 1 | ||||
| -rw-r--r-- | lib/std/zig/WindowsSdk.zig | 9 |
24 files changed, 486 insertions, 219 deletions
diff --git a/lib/std/Build/Fuzz.zig b/lib/std/Build/Fuzz.zig index d308efdf70..5521f7393b 100644 --- a/lib/std/Build/Fuzz.zig +++ b/lib/std/Build/Fuzz.zig @@ -78,7 +78,7 @@ pub fn init( all_steps: []const *Build.Step, root_prog_node: std.Progress.Node, mode: Mode, -) Allocator.Error!Fuzz { +) error{ OutOfMemory, Canceled }!Fuzz { const run_steps: []const *Step.Run = steps: { var steps: std.ArrayList(*Step.Run) = .empty; defer steps.deinit(gpa); @@ -98,7 +98,7 @@ pub fn init( if (steps.items.len == 0) fatal("no fuzz tests found", .{}); rebuild_node.setEstimatedTotalItems(steps.items.len); const run_steps = try gpa.dupe(*Step.Run, steps.items); - rebuild_group.wait(io); + try rebuild_group.await(io); break :steps run_steps; }; errdefer gpa.free(run_steps); @@ -517,7 +517,7 @@ pub fn waitAndPrintReport(fuzz: *Fuzz) void { assert(fuzz.mode == .limit); const io = fuzz.io; - fuzz.group.wait(io); + fuzz.group.awaitUncancelable(io); fuzz.group = .init; std.debug.print("======= FUZZING REPORT =======\n", .{}); diff --git a/lib/std/Io.zig b/lib/std/Io.zig index 7c3aa98e16..cf23796db2 100644 --- a/lib/std/Io.zig +++ b/lib/std/Io.zig @@ -436,7 +436,7 @@ pub fn Poller(comptime StreamEnum: type) type { // Cancel the pending read into the FIFO. _ = windows.kernel32.CancelIo(handle); - // We have to wait for the handle to be signalled, i.e. for the cancellation to complete. + // We have to wait for the handle to be signalled, i.e. for the cancelation to complete. switch (windows.kernel32.WaitForSingleObject(handle, windows.INFINITE)) { windows.WAIT_OBJECT_0 => {}, windows.WAIT_FAILED => return windows.unexpectedError(windows.GetLastError()), @@ -631,7 +631,7 @@ pub const VTable = struct { /// Copied and then passed to `start`. context: []const u8, context_alignment: std.mem.Alignment, - start: *const fn (*Group, context: *const anyopaque) void, + start: *const fn (*Group, context: *const anyopaque) Cancelable!void, ) void, /// Thread-safe. groupConcurrent: *const fn ( @@ -642,9 +642,9 @@ pub const VTable = struct { /// Copied and then passed to `start`. context: []const u8, context_alignment: std.mem.Alignment, - start: *const fn (*Group, context: *const anyopaque) void, + start: *const fn (*Group, context: *const anyopaque) Cancelable!void, ) ConcurrentError!void, - groupWait: *const fn (?*anyopaque, *Group, token: *anyopaque) void, + groupAwait: *const fn (?*anyopaque, *Group, token: *anyopaque) Cancelable!void, groupCancel: *const fn (?*anyopaque, *Group, token: *anyopaque) void, recancel: *const fn (?*anyopaque) void, @@ -713,8 +713,8 @@ pub const VTable = struct { processExecutableOpen: *const fn (?*anyopaque, File.OpenFlags) std.process.OpenExecutableError!File, processExecutablePath: *const fn (?*anyopaque, buffer: []u8) std.process.ExecutablePathError!usize, - lockStderr: *const fn (?*anyopaque, buffer: []u8, ?Terminal.Mode) Cancelable!LockedStderr, - tryLockStderr: *const fn (?*anyopaque, buffer: []u8, ?Terminal.Mode) Cancelable!?LockedStderr, + lockStderr: *const fn (?*anyopaque, ?Terminal.Mode) Cancelable!LockedStderr, + tryLockStderr: *const fn (?*anyopaque, ?Terminal.Mode) Cancelable!?LockedStderr, unlockStderr: *const fn (?*anyopaque) void, processSetCurrentDir: *const fn (?*anyopaque, Dir) std.process.SetCurrentDirError!void, @@ -734,6 +734,7 @@ pub const VTable = struct { netWrite: *const fn (?*anyopaque, dest: net.Socket.Handle, header: []const u8, data: []const []const u8, splat: usize) net.Stream.Writer.Error!usize, netWriteFile: *const fn (?*anyopaque, net.Socket.Handle, header: []const u8, *Io.File.Reader, Io.Limit) net.Stream.Writer.WriteFileError!usize, netClose: *const fn (?*anyopaque, handle: []const net.Socket.Handle) void, + netShutdown: *const fn (?*anyopaque, handle: net.Socket.Handle, how: net.ShutdownHow) net.ShutdownError!void, netInterfaceNameResolve: *const fn (?*anyopaque, *const net.Interface.Name) net.Interface.Name.ResolveError!net.Interface, netInterfaceName: *const fn (?*anyopaque, net.Interface) net.Interface.NameError!net.Interface.Name, netLookup: *const fn (?*anyopaque, net.HostName, *Queue(net.HostName.LookupResult), net.HostName.LookupOptions) net.HostName.LookupError!void, @@ -1022,7 +1023,7 @@ pub fn Future(Result: type) type { any_future: ?*AnyFuture, result: Result, - /// Equivalent to `await` but places a cancellation request. This causes the task to receive + /// Equivalent to `await` but places a cancelation request. This causes the task to receive /// `error.Canceled` from its next "cancelation point" (if any). A cancelation point is a /// call to a function in `Io` which can return `error.Canceled`. /// @@ -1070,7 +1071,7 @@ pub const Group = struct { /// already been called and completed, or it has successfully been assigned /// a unit of concurrency. /// - /// After this is called, `wait` or `cancel` must be called before the + /// After this is called, `await` or `cancel` must be called before the /// group is deinitialized. /// /// Threadsafe. @@ -1081,21 +1082,21 @@ pub const Group = struct { pub fn async(g: *Group, io: Io, function: anytype, args: std.meta.ArgsTuple(@TypeOf(function))) void { const Args = @TypeOf(args); const TypeErased = struct { - fn start(group: *Group, context: *const anyopaque) void { + fn start(group: *Group, context: *const anyopaque) Cancelable!void { _ = group; const args_casted: *const Args = @ptrCast(@alignCast(context)); - @call(.auto, function, args_casted.*); + return @call(.auto, function, args_casted.*); } }; io.vtable.groupAsync(io.userdata, g, @ptrCast(&args), .of(Args), TypeErased.start); } /// Calls `function` with `args`, such that the function is not guaranteed - /// to have returned until `wait` is called, allowing the caller to + /// to have returned until `await` is called, allowing the caller to /// progress while waiting for any `Io` operations. /// /// The resource spawned is owned by the group; after this is called, - /// `wait` or `cancel` must be called before the group is deinitialized. + /// `await` or `cancel` must be called before the group is deinitialized. /// /// This has stronger guarantee than `async`, placing restrictions on what kind /// of `Io` implementations are supported. By calling `async` instead, one @@ -1109,30 +1110,41 @@ pub const Group = struct { pub fn concurrent(g: *Group, io: Io, function: anytype, args: std.meta.ArgsTuple(@TypeOf(function))) ConcurrentError!void { const Args = @TypeOf(args); const TypeErased = struct { - fn start(group: *Group, context: *const anyopaque) void { + fn start(group: *Group, context: *const anyopaque) Cancelable!void { _ = group; const args_casted: *const Args = @ptrCast(@alignCast(context)); - @call(.auto, function, args_casted.*); + return @call(.auto, function, args_casted.*); } }; return io.vtable.groupConcurrent(io.userdata, g, @ptrCast(&args), .of(Args), TypeErased.start); } /// Blocks until all tasks of the group finish. During this time, - /// cancellation requests propagate to all members of the group. + /// cancelation requests propagate to all members of the group. /// /// Idempotent. Not threadsafe. /// /// It is safe to call this function concurrently with `Group.async` or /// `Group.concurrent`, provided that the group does not complete until /// the call to `Group.async` or `Group.concurrent` returns. - pub fn wait(g: *Group, io: Io) void { + pub fn await(g: *Group, io: Io) Cancelable!void { const token = g.token.load(.acquire) orelse return; - io.vtable.groupWait(io.userdata, g, token); + try io.vtable.groupAwait(io.userdata, g, token); assert(g.token.raw == null); } - /// Equivalent to `wait` but immediately requests cancellation on all + /// Equivalent to `await` but temporarily blocks cancelation while waiting. + pub fn awaitUncancelable(g: *Group, io: Io) void { + const token = g.token.load(.acquire) orelse return; + const prev = swapCancelProtection(io, .blocked); + defer _ = swapCancelProtection(io, prev); + io.vtable.groupAwait(io.userdata, g, token) catch |err| switch (err) { + error.Canceled => unreachable, + }; + assert(g.token.raw == null); + } + + /// Equivalent to `await` but immediately requests cancelation on all /// members of the group. /// /// For a description of cancelation and cancelation points, see `Future.cancel`. @@ -1253,7 +1265,7 @@ pub fn Select(comptime U: type) type { ) void { const Args = @TypeOf(args); const TypeErased = struct { - fn start(group: *Group, context: *const anyopaque) void { + fn start(group: *Group, context: *const anyopaque) Cancelable!void { const args_casted: *const Args = @ptrCast(@alignCast(context)); const unerased_select: *S = @fieldParentPtr("group", group); const elem = @unionInit(U, @tagName(field), @call(.auto, function, args_casted.*)); @@ -1271,7 +1283,7 @@ pub fn Select(comptime U: type) type { /// Asserts there is at least one more `outstanding` task. /// /// Not threadsafe. - pub fn wait(s: *S) Cancelable!U { + pub fn await(s: *S) Cancelable!U { s.outstanding -= 1; return s.queue.getOne(s.io) catch |err| switch (err) { error.Canceled => |e| return e, @@ -1279,7 +1291,7 @@ pub fn Select(comptime U: type) type { }; } - /// Equivalent to `wait` but requests cancellation on all remaining + /// Equivalent to `wait` but requests cancelation on all remaining /// tasks owned by the select. /// /// For a description of cancelation and cancelation points, see `Future.cancel`. @@ -1336,7 +1348,13 @@ pub fn futexWake(io: Io, comptime T: type, ptr: *align(@alignOf(u32)) const T, m return io.vtable.futexWake(io.userdata, @ptrCast(ptr), max_waiters); } -pub const Mutex = struct { +/// Mutex is a synchronization primitive which enforces atomic access to a +/// shared region of code known as the "critical section". +/// +/// Mutex is an extern struct so that it may be used as a field inside another +/// extern struct. Having a guaranteed memory layout including mutexes is +/// important for IPC over shared memory (mmap). +pub const Mutex = extern struct { state: std.atomic.Value(State), pub const init: Mutex = .{ .state = .init(.unlocked) }; @@ -2189,6 +2207,23 @@ pub const LockedStderr = struct { .mode = ls.terminal_mode, }; } + + pub fn clear(ls: LockedStderr, buffer: []u8) Cancelable!void { + const fw = ls.file_writer; + std.Progress.clearWrittenWithEscapeCodes(fw) catch |err| switch (err) { + error.WriteFailed => switch (fw.err.?) { + error.Canceled => |e| return e, + else => {}, + }, + }; + fw.interface.flush() catch |err| switch (err) { + error.WriteFailed => switch (fw.err.?) { + error.Canceled => |e| return e, + else => {}, + }, + }; + fw.interface.buffer = buffer; + } }; /// For doing application-level writes to the standard error stream. @@ -2199,12 +2234,16 @@ pub const LockedStderr = struct { /// See also: /// * `tryLockStderr` pub fn lockStderr(io: Io, buffer: []u8, terminal_mode: ?Terminal.Mode) Cancelable!LockedStderr { - return io.vtable.lockStderr(io.userdata, buffer, terminal_mode); + const ls = try io.vtable.lockStderr(io.userdata, terminal_mode); + try ls.clear(buffer); + return ls; } /// Same as `lockStderr` but non-blocking. pub fn tryLockStderr(io: Io, buffer: []u8, terminal_mode: ?Terminal.Mode) Cancelable!?LockedStderr { - return io.vtable.tryLockStderr(io.userdata, buffer, terminal_mode); + const ls = (try io.vtable.tryLockStderr(io.userdata, buffer, terminal_mode)) orelse return null; + try ls.clear(buffer); + return ls; } pub fn unlockStderr(io: Io) void { diff --git a/lib/std/Io/Dir.zig b/lib/std/Io/Dir.zig index 577e114c40..82bf3b927d 100644 --- a/lib/std/Io/Dir.zig +++ b/lib/std/Io/Dir.zig @@ -113,6 +113,7 @@ pub const Reader = struct { }, .wasi => @sizeOf(std.os.wasi.dirent_t) + std.mem.alignForward(usize, max_name_bytes, @alignOf(std.os.wasi.dirent_t)), + .openbsd => std.c.S.BLKSIZE, else => if (builtin.link_libc) @sizeOf(std.c.dirent) else std.mem.alignForward(usize, max_name_bytes, @alignOf(usize)), }; @@ -373,8 +374,8 @@ pub const Walker = struct { /// Leaves the current directory, continuing walking one level up. /// If the current entry is a directory entry, then the "current directory" /// is the directory pertaining to the current entry. - pub fn leave(self: *Walker) void { - self.inner.leave(); + pub fn leave(self: *Walker, io: Io) void { + self.inner.leave(io); } }; diff --git a/lib/std/Io/Kqueue.zig b/lib/std/Io/Kqueue.zig index 26b8298cab..df9fa1dee6 100644 --- a/lib/std/Io/Kqueue.zig +++ b/lib/std/Io/Kqueue.zig @@ -900,6 +900,7 @@ pub fn io(k: *Kqueue) Io { .netConnectIp = netConnectIp, .netConnectUnix = netConnectUnix, .netClose = netClose, + .netShutdown = netShutdown, .netRead = netRead, .netWrite = netWrite, .netSend = netSend, @@ -1549,12 +1550,22 @@ fn netWrite(userdata: ?*anyopaque, dest: net.Socket.Handle, header: []const u8, _ = splat; @panic("TODO"); } + fn netClose(userdata: ?*anyopaque, handle: net.Socket.Handle) void { const k: *Kqueue = @ptrCast(@alignCast(userdata)); _ = k; _ = handle; @panic("TODO"); } + +fn netShutdown(userdata: ?*anyopaque, handle: net.Socket.Handle, how: net.ShutdownHow) net.ShutdownError!void { + const k: *Kqueue = @ptrCast(@alignCast(userdata)); + _ = k; + _ = handle; + _ = how; + @panic("TODO"); +} + fn netInterfaceNameResolve( userdata: ?*anyopaque, name: *const net.Interface.Name, @@ -1564,12 +1575,14 @@ fn netInterfaceNameResolve( _ = name; @panic("TODO"); } + fn netInterfaceName(userdata: ?*anyopaque, interface: net.Interface) net.Interface.NameError!net.Interface.Name { const k: *Kqueue = @ptrCast(@alignCast(userdata)); _ = k; _ = interface; @panic("TODO"); } + fn netLookup( userdata: ?*anyopaque, host_name: net.HostName, diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig index 9e758be9fd..c9e6ba3fc7 100644 --- a/lib/std/Io/Threaded.zig +++ b/lib/std/Io/Threaded.zig @@ -390,6 +390,52 @@ const Thread = struct { else => unreachable, }; }, + .openbsd => { + var tm: std.c.timespec = undefined; + var tm_ptr: ?*const std.c.timespec = null; + if (timeout_ns) |ns| { + tm_ptr = &tm; + tm = timestampToPosix(ns); + } + if (thread) |t| try t.beginSyscall(); + const rc = std.c.futex( + ptr, + std.c.FUTEX.WAIT | std.c.FUTEX.PRIVATE_FLAG, + @as(c_int, @bitCast(expect)), + tm_ptr, + null, // uaddr2 is ignored + ); + if (thread) |t| t.endSyscall(); + if (is_debug) switch (posix.errno(rc)) { + .SUCCESS => {}, + .NOSYS => unreachable, // constant op known good value + .AGAIN => {}, // contents of uaddr != val + .INVAL => unreachable, // invalid timeout + .TIMEDOUT => {}, // timeout + .INTR => {}, // a signal arrived + .CANCELED => {}, // a signal arrived and SA_RESTART was set + else => unreachable, + }; + }, + .dragonfly => { + var timeout_us: c_int = undefined; + if (timeout_ns) |ns| { + timeout_us = std.math.cast(c_int, ns / std.time.ns_per_us) orelse std.math.maxInt(c_int); + } else { + timeout_us = 0; + } + if (thread) |t| try t.beginSyscall(); + const rc = std.c.umtx_sleep(@ptrCast(ptr), @bitCast(expect), timeout_us); + if (thread) |t| t.endSyscall(); + if (is_debug) switch (std.posix.errno(rc)) { + .SUCCESS => {}, + .BUSY => {}, // ptr != expect + .AGAIN => {}, // maybe timed out, or paged out, or hit 2s kernel refresh + .INTR => {}, // spurious wake + .INVAL => unreachable, // invalid timeout + else => unreachable, + }; + }, else => if (std.Thread.use_pthreads) { // TODO integrate the following function being called with robust cancelation. return pthreads_futex.wait(ptr, expect, timeout_ns) catch |err| switch (err) { @@ -473,6 +519,23 @@ const Thread = struct { else => unreachable, // deadlock due to operating system bug } }, + .openbsd => { + const rc = std.c.futex( + ptr, + std.c.FUTEX.WAKE | std.c.FUTEX.PRIVATE_FLAG, + @min(max_waiters, std.math.maxInt(c_int)), + null, // timeout is ignored + null, // uaddr2 is ignored + ); + assert(rc >= 0); + }, + .dragonfly => { + // will generally return 0 unless the address is bad + _ = std.c.umtx_wakeup( + @ptrCast(ptr), + @min(max_waiters, std.math.maxInt(c_int)), + ); + }, else => if (std.Thread.use_pthreads) { return pthreads_futex.wake(ptr, max_waiters); } else { @@ -795,7 +858,7 @@ pub fn io(t: *Threaded) Io { .groupAsync = groupAsync, .groupConcurrent = groupConcurrent, - .groupWait = groupWait, + .groupAwait = groupAwait, .groupCancel = groupCancel, .recancel = recancel, @@ -891,6 +954,10 @@ pub fn io(t: *Threaded) Io { else => netConnectUnixPosix, }, .netClose = netClose, + .netShutdown = switch (native_os) { + .windows => netShutdownWindows, + else => netShutdownPosix, + }, .netRead = switch (native_os) { .windows => netReadWindows, else => netReadPosix, @@ -929,7 +996,7 @@ pub fn ioBasic(t: *Threaded) Io { .groupAsync = groupAsync, .groupConcurrent = groupConcurrent, - .groupWait = groupWait, + .groupAwait = groupAwait, .groupCancel = groupCancel, .recancel = recancel, @@ -1007,6 +1074,7 @@ pub fn ioBasic(t: *Threaded) Io { .netConnectIp = netConnectIpUnavailable, .netConnectUnix = netConnectUnixUnavailable, .netClose = netCloseUnavailable, + .netShutdown = netShutdownUnavailable, .netRead = netReadUnavailable, .netWrite = netWriteUnavailable, .netWriteFile = netWriteFileUnavailable, @@ -1161,6 +1229,7 @@ const AsyncClosure = struct { error.Canceled => { ac.closure.requestCancel(t); ac.event.waitUncancelable(ioBasic(t)); + recancel(t); }, }; @memcpy(result, ac.resultPointer()[0..result.len]); @@ -1273,7 +1342,7 @@ const GroupClosure = struct { group: *Io.Group, /// Points to sibling `GroupClosure`. Used for walking the group to cancel all. node: std.SinglyLinkedList.Node, - func: *const fn (*Io.Group, context: *anyopaque) void, + func: *const fn (*Io.Group, context: *anyopaque) Io.Cancelable!void, context_alignment: Alignment, alloc_len: usize, @@ -1286,7 +1355,7 @@ const GroupClosure = struct { current_thread.current_closure = closure; current_thread.cancel_protection = .unblocked; - gc.func(group, gc.contextPointer()); + assertResult(closure, gc.func(group, gc.contextPointer())); current_thread.current_closure = null; current_thread.cancel_protection = undefined; @@ -1296,6 +1365,16 @@ const GroupClosure = struct { if (prev_state == (sync_one_pending | sync_is_waiting)) event.set(ioBasic(t)); } + fn assertResult(closure: *Closure, result: Io.Cancelable!void) void { + if (result) |_| switch (closure.cancel_status.unpack()) { + .none, .requested => {}, + .acknowledged => unreachable, // task illegally swallowed error.Canceled + .signal_id => unreachable, + } else |err| switch (err) { + error.Canceled => assert(closure.cancel_status == .acknowledged), + } + } + fn contextPointer(gc: *GroupClosure) [*]u8 { const base: [*]u8 = @ptrCast(gc); const context_offset = gc.context_alignment.forward(@intFromPtr(gc) + @sizeOf(GroupClosure)) - @intFromPtr(gc); @@ -1308,7 +1387,7 @@ const GroupClosure = struct { group: *Io.Group, context: []const u8, context_alignment: Alignment, - func: *const fn (*Io.Group, context: *const anyopaque) void, + func: *const fn (*Io.Group, context: *const anyopaque) Io.Cancelable!void, ) Allocator.Error!*GroupClosure { const max_context_misalignment = context_alignment.toByteUnits() -| @alignOf(GroupClosure); const worst_case_context_offset = context_alignment.forward(@sizeOf(GroupClosure) + max_context_misalignment); @@ -1346,14 +1425,14 @@ fn groupAsync( group: *Io.Group, context: []const u8, context_alignment: Alignment, - start: *const fn (*Io.Group, context: *const anyopaque) void, + start: *const fn (*Io.Group, context: *const anyopaque) Io.Cancelable!void, ) void { const t: *Threaded = @ptrCast(@alignCast(userdata)); - if (builtin.single_threaded) return start(group, context.ptr); + if (builtin.single_threaded) return start(group, context.ptr) catch unreachable; const gpa = t.allocator; const gc = GroupClosure.init(gpa, group, context, context_alignment, start) catch - return start(group, context.ptr); + return t.assertGroupResult(start(group, context.ptr)); t.mutex.lock(); @@ -1362,7 +1441,7 @@ fn groupAsync( if (busy_count >= @intFromEnum(t.async_limit)) { t.mutex.unlock(); gc.deinit(gpa); - return start(group, context.ptr); + return t.assertGroupResult(start(group, context.ptr)); } t.busy_count = busy_count + 1; @@ -1375,7 +1454,7 @@ fn groupAsync( t.busy_count = busy_count; t.mutex.unlock(); gc.deinit(gpa); - return start(group, context.ptr); + return t.assertGroupResult(start(group, context.ptr)); }; thread.detach(); } @@ -1396,12 +1475,18 @@ fn groupAsync( t.cond.signal(); } +fn assertGroupResult(t: *Threaded, result: Io.Cancelable!void) void { + const current_thread: *Thread = .getCurrent(t); + const current_closure = current_thread.current_closure orelse return; + GroupClosure.assertResult(current_closure, result); +} + fn groupConcurrent( userdata: ?*anyopaque, group: *Io.Group, context: []const u8, context_alignment: Alignment, - start: *const fn (*Io.Group, context: *const anyopaque) void, + start: *const fn (*Io.Group, context: *const anyopaque) Io.Cancelable!void, ) Io.ConcurrentError!void { if (builtin.single_threaded) return error.ConcurrencyUnavailable; @@ -1447,7 +1532,7 @@ fn groupConcurrent( t.cond.signal(); } -fn groupWait(userdata: ?*anyopaque, group: *Io.Group, initial_token: *anyopaque) void { +fn groupAwait(userdata: ?*anyopaque, group: *Io.Group, initial_token: *anyopaque) Io.Cancelable!void { const t: *Threaded = @ptrCast(@alignCast(userdata)); const gpa = t.allocator; @@ -1459,16 +1544,14 @@ fn groupWait(userdata: ?*anyopaque, group: *Io.Group, initial_token: *anyopaque) const event: *Io.Event = @ptrCast(&group.context); const prev_state = group_state.fetchAdd(GroupClosure.sync_is_waiting, .acquire); assert(prev_state & GroupClosure.sync_is_waiting == 0); - if ((prev_state / GroupClosure.sync_one_pending) > 0) event.wait(ioBasic(t)) catch |err| switch (err) { - error.Canceled => { - var it: ?*std.SinglyLinkedList.Node = @ptrCast(@alignCast(group.token.load(.monotonic))); - while (it) |node| : (it = node.next) { - const gc: *GroupClosure = @fieldParentPtr("node", node); - gc.closure.requestCancel(t); - } - event.waitUncancelable(ioBasic(t)); - }, - }; + { + errdefer _ = group_state.fetchSub(GroupClosure.sync_is_waiting, .monotonic); + // This event.wait can return error.Canceled, in which case this logic does + // *not* propagate cancel requests to each group member. Instead, the user + // code will likely do this with a defered call to groupCancel, or, + // intentionally not do this. + if ((prev_state / GroupClosure.sync_one_pending) > 0) try event.wait(ioBasic(t)); + } // Since the group has now finished, it's illegal to add more tasks to it until we return. It's // also illegal for us to race with another `await` or `cancel`. Therefore, we must be the only @@ -10390,6 +10473,89 @@ fn netCloseUnavailable(userdata: ?*anyopaque, handles: []const net.Socket.Handle unreachable; // How you gonna close something that was impossible to open? } +fn netShutdownPosix(userdata: ?*anyopaque, handle: net.Socket.Handle, how: net.ShutdownHow) net.ShutdownError!void { + if (!have_networking) return error.NetworkDown; + const t: *Threaded = @ptrCast(@alignCast(userdata)); + const current_thread = Thread.getCurrent(t); + + const posix_how: i32 = switch (how) { + .recv => posix.SHUT.RD, + .send => posix.SHUT.WR, + .both => posix.SHUT.RDWR, + }; + + try current_thread.beginSyscall(); + while (true) { + switch (posix.errno(posix.system.shutdown(handle, posix_how))) { + .SUCCESS => { + current_thread.endSyscall(); + return; + }, + .INTR => { + try current_thread.checkCancel(); + continue; + }, + else => |e| { + current_thread.endSyscall(); + switch (e) { + .BADF, .NOTSOCK, .INVAL => |err| return errnoBug(err), + .NOTCONN => return error.SocketUnconnected, + .NOBUFS => return error.SystemResources, + else => |err| return posix.unexpectedErrno(err), + } + }, + } + } +} + +fn netShutdownWindows(userdata: ?*anyopaque, handle: net.Socket.Handle, how: net.ShutdownHow) net.ShutdownError!void { + if (!have_networking) return error.NetworkDown; + const t: *Threaded = @ptrCast(@alignCast(userdata)); + const current_thread = Thread.getCurrent(t); + + const wsa_how: i32 = switch (how) { + .recv => ws2_32.SD_RECEIVE, + .send => ws2_32.SD_SEND, + .both => ws2_32.SD_BOTH, + }; + + try current_thread.beginSyscall(); + while (true) { + const rc = ws2_32.shutdown(handle, wsa_how); + if (rc != ws2_32.SOCKET_ERROR) { + current_thread.endSyscall(); + return; + } + switch (ws2_32.WSAGetLastError()) { + .EINTR => { + try current_thread.checkCancel(); + continue; + }, + .NOTINITIALISED => { + try initializeWsa(t); + try current_thread.checkCancel(); + continue; + }, + else => |e| { + current_thread.endSyscall(); + switch (e) { + .ECANCELLED, .E_CANCELLED, .OPERATION_ABORTED => return error.Canceled, + .ECONNABORTED => return error.ConnectionAborted, + .ECONNRESET => return error.ConnectionResetByPeer, + .ENETDOWN => return error.NetworkDown, + .ENOTCONN => return error.SocketUnconnected, + .EINVAL, .ENOTSOCK => |err| return wsaErrorBug(err), + else => |err| return windows.unexpectedWSAError(err), + } + }, + } + } +} + +fn netShutdownUnavailable(_: ?*anyopaque, _: net.Socket.Handle, _: net.ShutdownHow) net.ShutdownError!void { + unreachable; // How you gonna shutdown something that was impossible to open? +} + fn netInterfaceNameResolve( userdata: ?*anyopaque, name: *const net.Interface.Name, @@ -10776,33 +10942,21 @@ fn netLookupFallible( return error.OptionUnsupported; } -fn lockStderr( - userdata: ?*anyopaque, - buffer: []u8, - terminal_mode: ?Io.Terminal.Mode, -) Io.Cancelable!Io.LockedStderr { +fn lockStderr(userdata: ?*anyopaque, terminal_mode: ?Io.Terminal.Mode) Io.Cancelable!Io.LockedStderr { const t: *Threaded = @ptrCast(@alignCast(userdata)); // Only global mutex since this is Threaded. std.process.stderr_thread_mutex.lock(); - return initLockedStderr(t, buffer, terminal_mode); + return initLockedStderr(t, terminal_mode); } -fn tryLockStderr( - userdata: ?*anyopaque, - buffer: []u8, - terminal_mode: ?Io.Terminal.Mode, -) Io.Cancelable!?Io.LockedStderr { +fn tryLockStderr(userdata: ?*anyopaque, terminal_mode: ?Io.Terminal.Mode) Io.Cancelable!?Io.LockedStderr { const t: *Threaded = @ptrCast(@alignCast(userdata)); // Only global mutex since this is Threaded. if (!std.process.stderr_thread_mutex.tryLock()) return null; - return try initLockedStderr(t, buffer, terminal_mode); + return try initLockedStderr(t, terminal_mode); } -fn initLockedStderr( - t: *Threaded, - buffer: []u8, - terminal_mode: ?Io.Terminal.Mode, -) Io.Cancelable!Io.LockedStderr { +fn initLockedStderr(t: *Threaded, terminal_mode: ?Io.Terminal.Mode) Io.Cancelable!Io.LockedStderr { if (!t.stderr_writer_initialized) { const io_t = ioBasic(t); if (is_windows) t.stderr_writer.file = .stderr(); @@ -10813,19 +10967,6 @@ fn initLockedStderr( const CLICOLOR_FORCE = t.environ.exist.CLICOLOR_FORCE; t.stderr_mode = terminal_mode orelse try .detect(io_t, t.stderr_writer.file, NO_COLOR, CLICOLOR_FORCE); } - std.Progress.clearWrittenWithEscapeCodes(&t.stderr_writer) catch |err| switch (err) { - error.WriteFailed => switch (t.stderr_writer.err.?) { - error.Canceled => |e| return e, - else => {}, - }, - }; - t.stderr_writer.interface.flush() catch |err| switch (err) { - error.WriteFailed => switch (t.stderr_writer.err.?) { - error.Canceled => |e| return e, - else => {}, - }, - }; - t.stderr_writer.interface.buffer = buffer; return .{ .file_writer = &t.stderr_writer, .terminal_mode = terminal_mode orelse t.stderr_mode, diff --git a/lib/std/Io/Threaded/test.zig b/lib/std/Io/Threaded/test.zig index 8169f6bb37..faf1a5cb64 100644 --- a/lib/std/Io/Threaded/test.zig +++ b/lib/std/Io/Threaded/test.zig @@ -124,7 +124,7 @@ test "Group.async context alignment" { var group: std.Io.Group = .init; var result: ByteArray512 = undefined; group.async(io, concatByteArraysResultPtr, .{ a, b, &result }); - group.wait(io); + group.awaitUncancelable(io); try std.testing.expectEqualSlices(u8, &expected.x, &result.x); } diff --git a/lib/std/Io/net.zig b/lib/std/Io/net.zig index 8b1523fbd3..76a581180d 100644 --- a/lib/std/Io/net.zig +++ b/lib/std/Io/net.zig @@ -954,6 +954,16 @@ pub const SendFlags = packed struct(u8) { _: u3 = 0, }; +pub const ShutdownHow = enum { recv, send, both }; + +pub const ShutdownError = error{ + ConnectionAborted, + ConnectionResetByPeer, + NetworkDown, + SocketUnconnected, + SystemResources, +} || Io.UnexpectedError || Io.Cancelable; + pub const Interface = struct { /// Value 0 indicates `none`. index: u32, @@ -1191,6 +1201,10 @@ pub const Stream = struct { io.vtable.netClose(io.userdata, (&s.socket.handle)[0..1]); } + pub fn shutdown(s: *const Stream, io: Io, how: ShutdownHow) ShutdownError!void { + return io.vtable.netShutdown(io.userdata, s.socket.handle, how); + } + pub const Reader = struct { io: Io, interface: Io.Reader, diff --git a/lib/std/Io/net/HostName.zig b/lib/std/Io/net/HostName.zig index 84484b9dc1..03f92cc022 100644 --- a/lib/std/Io/net/HostName.zig +++ b/lib/std/Io/net/HostName.zig @@ -289,7 +289,7 @@ pub fn connectMany( } else |err| switch (err) { error.Canceled => |e| return e, error.Closed => { - group.wait(io); + try group.await(io); return lookup_future.await(io); }, } diff --git a/lib/std/Io/net/test.zig b/lib/std/Io/net/test.zig index 6ef8c15f4f..45c26f9540 100644 --- a/lib/std/Io/net/test.zig +++ b/lib/std/Io/net/test.zig @@ -346,6 +346,8 @@ test "non-blocking tcp server" { const len = try socket_file.read(&buf); const msg = buf[0..len]; try testing.expect(mem.eql(u8, msg, "hello from server\n")); + + try stream.shutdown(io, .both); } test "decompress compressed DNS name" { diff --git a/lib/std/Io/test.zig b/lib/std/Io/test.zig index a317c822cb..582ad06615 100644 --- a/lib/std/Io/test.zig +++ b/lib/std/Io/test.zig @@ -194,7 +194,7 @@ test "Group" { group.async(io, count, .{ 1, 10, &results[0] }); group.async(io, count, .{ 20, 30, &results[1] }); - group.wait(io); + group.awaitUncancelable(io); try testing.expectEqualSlices(usize, &.{ 45, 245 }, &results); } @@ -207,27 +207,48 @@ fn count(a: usize, b: usize, result: *usize) void { result.* = sum; } -test "Group cancellation" { +test "Group cancelation" { const io = testing.io; var group: Io.Group = .init; - var results: [2]usize = undefined; + var results: [4]usize = .{ 0, 0, 0, 0 }; + // TODO when robust cancelation is available, make the sleep timeouts much + // longer so that it causes the unit test to be failed if not canceled. + // https://codeberg.org/ziglang/zig/issues/30049 group.async(io, sleep, .{ io, &results[0] }); group.async(io, sleep, .{ io, &results[1] }); + group.async(io, sleepUncancelable, .{ io, &results[2] }); + group.async(io, sleepRecancel, .{ io, &results[3] }); group.cancel(io); - try testing.expectEqualSlices(usize, &.{ 1, 1 }, &results); + try testing.expectEqualSlices(usize, &.{ 1, 1, 1, 1 }, &results); +} + +fn sleep(io: Io, result: *usize) error{Canceled}!void { + defer result.* = 1; + io.sleep(.fromMilliseconds(1), .awake) catch |err| switch (err) { + error.Canceled => |e| return e, + else => {}, + }; } -fn sleep(io: Io, result: *usize) void { - // TODO when cancellation race bug is fixed, make this timeout much longer so that - // it causes the unit test to be failed if not canceled. +fn sleepUncancelable(io: Io, result: *usize) void { + const old_prot = io.swapCancelProtection(.blocked); + defer _ = io.swapCancelProtection(old_prot); io.sleep(.fromMilliseconds(1), .awake) catch {}; result.* = 1; } +fn sleepRecancel(io: Io, result: *usize) void { + io.sleep(.fromMilliseconds(1), .awake) catch |err| switch (err) { + error.Canceled => io.recancel(), + else => {}, + }; + result.* = 1; +} + test "Group concurrent" { const io = testing.io; @@ -249,7 +270,7 @@ test "Group concurrent" { }, }; - group.wait(io); + try group.await(io); try testing.expectEqualSlices(usize, &.{ 45, 245 }, &results); } diff --git a/lib/std/Progress.zig b/lib/std/Progress.zig index 6baa24d246..f0baca2784 100644 --- a/lib/std/Progress.zig +++ b/lib/std/Progress.zig @@ -525,8 +525,8 @@ pub fn start(io: Io, options: Options) Node { if (switch (global_progress.terminal_mode) { .off => unreachable, // handled a few lines above - .ansi_escape_codes => io.concurrent(updateThreadRun, .{io}), - .windows_api => if (is_windows) io.concurrent(windowsApiUpdateThreadRun, .{io}) else unreachable, + .ansi_escape_codes => io.concurrent(updateTask, .{io}), + .windows_api => if (is_windows) io.concurrent(windowsApiUpdateTask, .{io}) else unreachable, }) |future| { global_progress.update_worker = future; } else |err| { @@ -561,18 +561,23 @@ fn wait(io: Io, timeout_ns: u64) bool { return resize_flag or (global_progress.cols == 0); } -fn updateThreadRun(io: Io) void { +fn updateTask(io: Io) void { // Store this data in the thread so that it does not need to be part of the // linker data of the main executable. var serialized_buffer: Serialized.Buffer = undefined; + // In this function we bypass the wrapper code inside `Io.lockStderr` / + // `Io.tryLockStderr` in order to avoid clearing the terminal twice. + // We still want to go through the `Io` instance however in case it uses a + // task-switching mutex. + { const resize_flag = wait(io, global_progress.initial_delay_ns); if (@atomicLoad(bool, &global_progress.done, .monotonic)) return; maybeUpdateSize(resize_flag); const buffer, _ = computeRedraw(&serialized_buffer); - if (io.tryLockStderr(&.{}, null) catch return) |locked_stderr| { + if (io.vtable.tryLockStderr(io.userdata, null) catch return) |locked_stderr| { defer io.unlockStderr(); global_progress.need_clear = true; locked_stderr.file_writer.interface.writeAll(buffer) catch return; @@ -583,7 +588,7 @@ fn updateThreadRun(io: Io) void { const resize_flag = wait(io, global_progress.refresh_rate_ns); if (@atomicLoad(bool, &global_progress.done, .monotonic)) { - const stderr = io.lockStderr(&.{}, null) catch return; + const stderr = io.vtable.lockStderr(io.userdata, null) catch return; defer io.unlockStderr(); return clearWrittenWithEscapeCodes(stderr.file_writer) catch {}; } @@ -591,7 +596,7 @@ fn updateThreadRun(io: Io) void { maybeUpdateSize(resize_flag); const buffer, _ = computeRedraw(&serialized_buffer); - if (io.tryLockStderr(&.{}, null) catch return) |locked_stderr| { + if (io.vtable.tryLockStderr(io.userdata, null) catch return) |locked_stderr| { defer io.unlockStderr(); global_progress.need_clear = true; locked_stderr.file_writer.interface.writeAll(buffer) catch return; @@ -607,16 +612,21 @@ fn windowsApiWriteMarker() void { _ = windows.kernel32.WriteConsoleW(handle, &[_]u16{windows_api_start_marker}, 1, &num_chars_written, null); } -fn windowsApiUpdateThreadRun(io: Io) void { +fn windowsApiUpdateTask(io: Io) void { var serialized_buffer: Serialized.Buffer = undefined; + // In this function we bypass the wrapper code inside `Io.lockStderr` / + // `Io.tryLockStderr` in order to avoid clearing the terminal twice. + // We still want to go through the `Io` instance however in case it uses a + // task-switching mutex. + { const resize_flag = wait(io, global_progress.initial_delay_ns); if (@atomicLoad(bool, &global_progress.done, .monotonic)) return; maybeUpdateSize(resize_flag); const buffer, const nl_n = computeRedraw(&serialized_buffer); - if (io.tryLockStderr(&.{}, null) catch return) |locked_stderr| { + if (io.vtable.tryLockStderr(io.userdata, null) catch return) |locked_stderr| { defer io.unlockStderr(); windowsApiWriteMarker(); global_progress.need_clear = true; @@ -629,7 +639,7 @@ fn windowsApiUpdateThreadRun(io: Io) void { const resize_flag = wait(io, global_progress.refresh_rate_ns); if (@atomicLoad(bool, &global_progress.done, .monotonic)) { - _ = io.lockStderr(&.{}, null) catch return; + _ = io.vtable.lockStderr(io.userdata, null) catch return; defer io.unlockStderr(); return clearWrittenWindowsApi() catch {}; } @@ -637,7 +647,7 @@ fn windowsApiUpdateThreadRun(io: Io) void { maybeUpdateSize(resize_flag); const buffer, const nl_n = computeRedraw(&serialized_buffer); - if (io.tryLockStderr(&.{}, null) catch return) |locked_stderr| { + if (io.vtable.tryLockStderr(io.userdata, null) catch return) |locked_stderr| { defer io.unlockStderr(); clearWrittenWindowsApi() catch return; windowsApiWriteMarker(); diff --git a/lib/std/c.zig b/lib/std/c.zig index 5f0d2dedf8..e9810aeb09 100644 --- a/lib/std/c.zig +++ b/lib/std/c.zig @@ -167,6 +167,18 @@ pub const timespec = switch (native_os) { .openbsd, .haiku => extern struct { sec: time_t, nsec: isize, + + /// For use with `utimensat` and `futimens`. + pub const NOW: timespec = .{ + .sec = 0, // ignored + .nsec = -2, + }; + + /// For use with `utimensat` and `futimens`. + pub const OMIT: timespec = .{ + .sec = 0, // ignored + .nsec = -1, + }; }, else => void, }; @@ -2365,6 +2377,8 @@ pub const S = switch (native_os) { pub const IWOTH = 0o002; pub const IXOTH = 0o001; + pub const BLKSIZE = 512; + pub fn ISFIFO(m: u32) bool { return m & IFMT == IFIFO; } @@ -9676,6 +9690,7 @@ pub const NSIG = switch (native_os) { .illumos => 75, // https://github.com/SerenityOS/serenity/blob/046c23f567a17758d762a33bdf04bacbfd088f9f/Kernel/API/POSIX/signal_numbers.h#L42 .openbsd, .serenity => 33, + .dragonfly => 64, else => {}, }; diff --git a/lib/std/crypto.zig b/lib/std/crypto.zig index a942d6538f..bbcba6a02b 100644 --- a/lib/std/crypto.zig +++ b/lib/std/crypto.zig @@ -184,7 +184,7 @@ pub const pwhash = struct { pub const Error = HasherError || error{AllocatorRequired}; pub const HasherError = KdfError || phc_format.Error; - pub const KdfError = errors.Error || std.mem.Allocator.Error || std.Thread.SpawnError; + pub const KdfError = errors.Error || std.mem.Allocator.Error || std.Thread.SpawnError || std.Io.Cancelable; pub const argon2 = @import("crypto/argon2.zig"); pub const bcrypt = @import("crypto/bcrypt.zig"); diff --git a/lib/std/crypto/argon2.zig b/lib/std/crypto/argon2.zig index 42165ca524..aae8400679 100644 --- a/lib/std/crypto/argon2.zig +++ b/lib/std/crypto/argon2.zig @@ -2,9 +2,9 @@ // https://github.com/golang/crypto/tree/master/argon2 // https://github.com/P-H-C/phc-winner-argon2 -const std = @import("std"); const builtin = @import("builtin"); +const std = @import("std"); const blake2 = crypto.hash.blake2; const crypto = std.crypto; const Io = std.Io; @@ -53,23 +53,24 @@ pub const Mode = enum { pub const Params = struct { const Self = @This(); - /// A [t]ime cost, which defines the amount of computation realized and therefore the execution + /// Time cost, which defines the amount of computation realized and therefore the execution /// time, given in number of iterations. t: u32, - /// A [m]emory cost, which defines the memory usage, given in kibibytes. + /// Memory cost, which defines the memory usage, given in kibibytes. m: u32, - /// A [p]arallelism degree, which defines the number of parallel threads. + /// Parallelism degree, which defines the number of independent tasks, + /// to be multiplexed onto threads when possible. p: u24, - /// The [secret] parameter, which is used for keyed hashing. This allows a secret key to be input + /// The secret parameter, which is used for keyed hashing. This allows a secret key to be input /// at hashing time (from some external location) and be folded into the value of the hash. This /// means that even if your salts and hashes are compromised, an attacker cannot brute-force to /// find the password without the key. secret: ?[]const u8 = null, - /// The [ad] parameter, which is used to fold any additional data into the hash value. Functionally, + /// The ad parameter, which is used to fold any additional data into the hash value. Functionally, /// this behaves almost exactly like the secret or salt parameters; the ad parameter is folding /// into the value of the hash. However, this parameter is used for different data. The salt /// should be a random string stored alongside your password. The secret should be a random key @@ -209,18 +210,18 @@ fn processBlocks( threads: u24, mode: Mode, io: Io, -) void { +) Io.Cancelable!void { const lanes = memory / threads; const segments = lanes / sync_points; if (builtin.single_threaded or threads == 1) { - processBlocksSt(blocks, time, memory, threads, mode, lanes, segments); + processBlocksSync(blocks, time, memory, threads, mode, lanes, segments); } else { - processBlocksMt(blocks, time, memory, threads, mode, lanes, segments, io); + try processBlocksAsync(blocks, time, memory, threads, mode, lanes, segments, io); } } -fn processBlocksSt( +fn processBlocksSync( blocks: *Blocks, time: u32, memory: u32, @@ -241,7 +242,7 @@ fn processBlocksSt( } } -fn processBlocksMt( +fn processBlocksAsync( blocks: *Blocks, time: u32, memory: u32, @@ -250,19 +251,20 @@ fn processBlocksMt( lanes: u32, segments: u32, io: Io, -) void { +) Io.Cancelable!void { var n: u32 = 0; while (n < time) : (n += 1) { var slice: u32 = 0; while (slice < sync_points) : (slice += 1) { var group: Io.Group = .init; + defer group.cancel(io); var lane: u24 = 0; while (lane < threads) : (lane += 1) { group.async(io, processSegment, .{ blocks, time, memory, threads, mode, lanes, segments, n, slice, lane, }); } - group.wait(io); + try group.await(io); } } } @@ -503,7 +505,7 @@ pub fn kdf( blocks.appendNTimesAssumeCapacity(@splat(0), memory); initBlocks(&blocks, &h0, memory, params.p); - processBlocks(&blocks, params.t, memory, params.p, mode, io); + try processBlocks(&blocks, params.t, memory, params.p, mode, io); finalize(&blocks, memory, params.p, derived_key); } diff --git a/lib/std/crypto/blake3.zig b/lib/std/crypto/blake3.zig index 53b28c24a1..1a1afab3ee 100644 --- a/lib/std/crypto/blake3.zig +++ b/lib/std/crypto/blake3.zig @@ -1,9 +1,11 @@ -const std = @import("std"); const builtin = @import("builtin"); + +const std = @import("std"); const fmt = std.fmt; const mem = std.mem; const Io = std.Io; const Thread = std.Thread; +const Allocator = std.mem.Allocator; const Vec4 = @Vector(4, u32); const Vec8 = @Vector(8, u32); @@ -767,7 +769,7 @@ fn buildMerkleTreeLayerParallel( key: [8]u32, flags: Flags, io: Io, -) void { +) Io.Cancelable!void { const num_parents = input_cvs.len / 2; // Process sequentially with SIMD for smaller tree layers to avoid thread overhead @@ -787,6 +789,7 @@ fn buildMerkleTreeLayerParallel( const num_workers = Thread.getCpuCount() catch 1; const parents_per_worker = (num_parents + num_workers - 1) / num_workers; var group: Io.Group = .init; + defer group.cancel(io); for (0..num_workers) |worker_id| { const start_idx = worker_id * parents_per_worker; @@ -801,7 +804,7 @@ fn buildMerkleTreeLayerParallel( .flags = flags, }}); } - group.wait(io); + try group.await(io); } fn parentOutput(parent_block: []const u8, key: [8]u32, flags: Flags) Output { @@ -987,7 +990,7 @@ pub const Blake3 = struct { d.final(out); } - pub fn hashParallel(b: []const u8, out: []u8, options: Options, allocator: std.mem.Allocator, io: Io) !void { + pub fn hashParallel(b: []const u8, out: []u8, options: Options, allocator: Allocator, io: Io) error{ OutOfMemory, Canceled }!void { if (b.len < parallel_threshold) { return hash(b, out, options); } @@ -1008,6 +1011,7 @@ pub const Blake3 = struct { const num_workers = thread_count; const chunks_per_worker = (num_full_chunks + num_workers - 1) / num_workers; var group: Io.Group = .init; + defer group.cancel(io); for (0..num_workers) |worker_id| { const start_chunk = worker_id * chunks_per_worker; @@ -1022,7 +1026,7 @@ pub const Blake3 = struct { .flags = flags, }}); } - group.wait(io); + try group.await(io); // Build Merkle tree in parallel layers using ping-pong buffers const max_intermediate_size = (num_full_chunks + 1) / 2; @@ -1040,7 +1044,7 @@ pub const Blake3 = struct { const has_odd = current_level.len % 2 == 1; const next_level_size = num_parents + @intFromBool(has_odd); - buildMerkleTreeLayerParallel( + try buildMerkleTreeLayerParallel( current_level[0 .. num_parents * 2], next_level_buf[0..num_parents], key_words, diff --git a/lib/std/crypto/kangarootwelve.zig b/lib/std/crypto/kangarootwelve.zig index 08adb3ba0b..944381ec4b 100644 --- a/lib/std/crypto/kangarootwelve.zig +++ b/lib/std/crypto/kangarootwelve.zig @@ -1,9 +1,10 @@ -const std = @import("std"); const builtin = @import("builtin"); + +const std = @import("std"); const crypto = std.crypto; const Allocator = std.mem.Allocator; const Io = std.Io; -const Thread = std.Thread; +const assert = std.debug.assert; const TurboSHAKE128State = crypto.hash.sha3.TurboShake128(0x06); const TurboSHAKE256State = crypto.hash.sha3.TurboShake256(0x06); @@ -598,7 +599,7 @@ inline fn processNLeaves( output: []align(@alignOf(u64)) u8, ) void { const cv_size = Variant.cv_size; - comptime std.debug.assert(cv_size % @sizeOf(u64) == 0); + comptime assert(cv_size % @sizeOf(u64) == 0); if (view.tryGetSlice(j, j + N * chunk_size)) |leaf_data| { var leaf_cvs: [N * cv_size]u8 = undefined; @@ -645,7 +646,7 @@ fn processLeafBatch(comptime Variant: type, ctx: LeafBatchContext) void { j += chunk_len; } - std.debug.assert(cvs_offset == ctx.output_cvs.len); + assert(cvs_offset == ctx.output_cvs.len); } /// Helper to process N leaves in SIMD and absorb CVs into state @@ -841,7 +842,7 @@ fn ktMultiThreaded( total_len: usize, output: []u8, ) !void { - comptime std.debug.assert(bytes_per_batch % (optimal_vector_len * chunk_size) == 0); + comptime assert(bytes_per_batch % (optimal_vector_len * chunk_size) == 0); const cv_size = Variant.cv_size; const StateType = Variant.StateType; @@ -883,6 +884,7 @@ fn ktMultiThreaded( var pending_cv_lens: [256]usize = .{0} ** 256; var select: Select = .init(io, select_buf); + defer select.cancel(); var batches_spawned: usize = 0; var next_to_process: usize = 0; @@ -901,7 +903,7 @@ fn ktMultiThreaded( batches_spawned += 1; } - const result = select.wait() catch unreachable; + const result = try select.await(); const batch = result.batch; const slot = batch.batch_idx % max_concurrent; @@ -925,7 +927,7 @@ fn ktMultiThreaded( } } - select.group.wait(io); + assert(select.outstanding == 0); } if (has_partial_leaf) { diff --git a/lib/std/fs.zig b/lib/std/fs.zig index 5f2d36323a..4d149dbbda 100644 --- a/lib/std/fs.zig +++ b/lib/std/fs.zig @@ -6,9 +6,6 @@ const std = @import("std.zig"); pub const path = @import("fs/path.zig"); pub const wasi = @import("fs/wasi.zig"); -pub const getAppDataDir = @import("fs/get_app_data_dir.zig").getAppDataDir; -pub const GetAppDataDirError = @import("fs/get_app_data_dir.zig").GetAppDataDirError; - pub const base64_alphabet = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_".*; /// Base64 encoder, replacing the standard `+/` with `-_` so that it can be used in a file name on any filesystem. @@ -25,5 +22,4 @@ pub const max_name_bytes = std.Io.Dir.max_name_bytes; test { _ = path; _ = @import("fs/test.zig"); - _ = @import("fs/get_app_data_dir.zig"); } diff --git a/lib/std/fs/get_app_data_dir.zig b/lib/std/fs/get_app_data_dir.zig deleted file mode 100644 index 24741206cc..0000000000 --- a/lib/std/fs/get_app_data_dir.zig +++ /dev/null @@ -1,66 +0,0 @@ -const std = @import("../std.zig"); -const builtin = @import("builtin"); -const unicode = std.unicode; -const mem = std.mem; -const fs = std.fs; -const native_os = builtin.os.tag; -const posix = std.posix; - -pub const GetAppDataDirError = error{ - OutOfMemory, - AppDataDirUnavailable, -}; - -/// Caller owns returned memory. -/// TODO determine if we can remove the allocator requirement -pub fn getAppDataDir(allocator: mem.Allocator, appname: []const u8) GetAppDataDirError![]u8 { - switch (native_os) { - .windows => { - const local_app_data_dir = std.process.getEnvVarOwned(allocator, "LOCALAPPDATA") catch |err| switch (err) { - error.OutOfMemory => |e| return e, - else => return error.AppDataDirUnavailable, - }; - defer allocator.free(local_app_data_dir); - return fs.path.join(allocator, &[_][]const u8{ local_app_data_dir, appname }); - }, - .maccatalyst, .macos => { - const home_dir = posix.getenv("HOME") orelse { - // TODO look in /etc/passwd - return error.AppDataDirUnavailable; - }; - return fs.path.join(allocator, &[_][]const u8{ home_dir, "Library", "Application Support", appname }); - }, - .linux, .freebsd, .netbsd, .dragonfly, .openbsd, .illumos, .serenity => { - if (posix.getenv("XDG_DATA_HOME")) |xdg| { - if (xdg.len > 0) { - return fs.path.join(allocator, &[_][]const u8{ xdg, appname }); - } - } - - const home_dir = posix.getenv("HOME") orelse { - // TODO look in /etc/passwd - return error.AppDataDirUnavailable; - }; - return fs.path.join(allocator, &[_][]const u8{ home_dir, ".local", "share", appname }); - }, - .haiku => { - var dir_path_buf: [std.fs.max_path_bytes]u8 = undefined; - const rc = std.c.find_directory(.B_USER_SETTINGS_DIRECTORY, -1, true, &dir_path_buf, dir_path_buf.len); - const settings_dir = try allocator.dupeZ(u8, mem.sliceTo(&dir_path_buf, 0)); - defer allocator.free(settings_dir); - switch (rc) { - 0 => return fs.path.join(allocator, &[_][]const u8{ settings_dir, appname }), - else => return error.AppDataDirUnavailable, - } - }, - else => @compileError("Unsupported OS"), - } -} - -test getAppDataDir { - if (native_os == .wasi) return error.SkipZigTest; - - // We can't actually validate the result - const dir = getAppDataDir(std.testing.allocator, "zig") catch return; - defer std.testing.allocator.free(dir); -} diff --git a/lib/std/fs/test.zig b/lib/std/fs/test.zig index bcb9048e0e..94cf3055e5 100644 --- a/lib/std/fs/test.zig +++ b/lib/std/fs/test.zig @@ -645,6 +645,7 @@ fn contains(entries: *const std.array_list.Managed(Dir.Entry), el: Dir.Entry) bo test "Dir.realPath smoke test" { if (native_os == .wasi) return error.SkipZigTest; + if (native_os == .openbsd) return error.SkipZigTest; try testWithAllSupportedPathTypes(struct { fn impl(ctx: *TestContext) !void { @@ -820,7 +821,7 @@ test "file operations on directories" { try expectError(error.IsDir, ctx.dir.createFile(io, test_dir_name, .{})); try expectError(error.IsDir, ctx.dir.deleteFile(io, test_dir_name)); switch (native_os) { - .dragonfly, .netbsd => { + .netbsd => { // no error when reading a directory. See https://github.com/ziglang/zig/issues/5732 const buf = try ctx.dir.readFileAlloc(io, test_dir_name, testing.allocator, .unlimited); testing.allocator.free(buf); @@ -1240,6 +1241,7 @@ test "createDirPath, put some files in it, deleteTreeMinStackSize" { test "createDirPath in a directory that no longer exists" { if (native_os == .windows) return error.SkipZigTest; // Windows returns FileBusy if attempting to remove an open dir + if (native_os == .dragonfly) return error.SkipZigTest; // DragonflyBSD does not produce error (hammer2 fs) const io = testing.io; diff --git a/lib/std/mem.zig b/lib/std/mem.zig index 76ee5f6a80..6612c6d618 100644 --- a/lib/std/mem.zig +++ b/lib/std/mem.zig @@ -2252,16 +2252,20 @@ test writeVarPackedInt { /// Swap the byte order of all the members of the fields of a struct /// (Changing their endianness) pub fn byteSwapAllFields(comptime S: type, ptr: *S) void { + byteSwapAllFieldsAligned(S, @alignOf(S), ptr); +} + +/// Swap the byte order of all the members of the fields of a struct +/// (Changing their endianness) +pub fn byteSwapAllFieldsAligned(comptime S: type, comptime A: comptime_int, ptr: *align(A) S) void { switch (@typeInfo(S)) { - .@"struct" => { - inline for (std.meta.fields(S)) |f| { + .@"struct" => |struct_info| { + if (struct_info.backing_integer) |Int| { + ptr.* = @bitCast(@byteSwap(@as(Int, @bitCast(ptr.*)))); + } else inline for (std.meta.fields(S)) |f| { switch (@typeInfo(f.type)) { - .@"struct" => |struct_info| if (struct_info.backing_integer) |Int| { - @field(ptr, f.name) = @bitCast(@byteSwap(@as(Int, @bitCast(@field(ptr, f.name))))); - } else { - byteSwapAllFields(f.type, &@field(ptr, f.name)); - }, - .@"union", .array => byteSwapAllFields(f.type, &@field(ptr, f.name)), + .@"struct" => byteSwapAllFieldsAligned(f.type, f.alignment, &@field(ptr, f.name)), + .@"union", .array => byteSwapAllFieldsAligned(f.type, f.alignment, &@field(ptr, f.name)), .@"enum" => { @field(ptr, f.name) = @enumFromInt(@byteSwap(@intFromEnum(@field(ptr, f.name)))); }, @@ -2317,6 +2321,20 @@ test byteSwapAllFields { f4: bool, f5: f32, }; + const P = packed struct(u32) { + f0: u1, + f1: u7, + f2: u4, + f3: u4, + f4: u16, + }; + const A = extern struct { + f0: u32, + f1: extern struct { + f0: u64, + } align(4), + f2: u32, + }; var s = T{ .f0 = 0x12, .f1 = 0x1234, @@ -2334,8 +2352,16 @@ test byteSwapAllFields { .f4 = false, .f5 = @as(f32, @bitCast(@as(u32, 0x45d42800))), }; + var p: P = @bitCast(@as(u32, 0x01234567)); + var a: A = A{ + .f0 = 0x12345678, + .f1 = .{ .f0 = 0x123456789ABCDEF0 }, + .f2 = 0x87654321, + }; byteSwapAllFields(T, &s); byteSwapAllFields(K, &k); + byteSwapAllFields(P, &p); + byteSwapAllFields(A, &a); try std.testing.expectEqual(T{ .f0 = 0x12, .f1 = 0x3412, @@ -2353,6 +2379,12 @@ test byteSwapAllFields { .f4 = false, .f5 = @as(f32, @bitCast(@as(u32, 0x0028d445))), }, k); + try std.testing.expectEqual(@as(P, @bitCast(@as(u32, 0x67452301))), p); + try std.testing.expectEqual(A{ + .f0 = 0x78563412, + .f1 = .{ .f0 = 0xF0DEBC9A78563412 }, + .f2 = 0x21436587, + }, a); } /// Reverses the byte order of all elements in a slice. diff --git a/lib/std/posix.zig b/lib/std/posix.zig index 52bc5f83e8..e56b9c095c 100644 --- a/lib/std/posix.zig +++ b/lib/std/posix.zig @@ -418,12 +418,42 @@ fn getRandomBytesDevURandom(buf: []u8) GetRandomError!void { const fd = try openZ("/dev/urandom", .{ .ACCMODE = .RDONLY, .CLOEXEC = true }, 0); defer close(fd); - const st = fstat(fd) catch |err| switch (err) { - error.Streaming => return error.NoDevice, - else => |e| return e, - }; - if (!S.ISCHR(st.mode)) { - return error.NoDevice; + switch (native_os) { + .linux => { + var stx = std.mem.zeroes(linux.Statx); + const rc = linux.statx( + fd, + "", + linux.AT.EMPTY_PATH, + .{ .TYPE = true }, + &stx, + ); + switch (errno(rc)) { + .SUCCESS => {}, + .ACCES => unreachable, + .BADF => unreachable, + .FAULT => unreachable, + .INVAL => unreachable, + .LOOP => unreachable, + .NAMETOOLONG => unreachable, + .NOENT => unreachable, + .NOMEM => return error.SystemResources, + .NOTDIR => unreachable, + else => |err| return unexpectedErrno(err), + } + if (!S.ISCHR(stx.mode)) { + return error.NoDevice; + } + }, + else => { + const st = fstat(fd) catch |err| switch (err) { + error.Streaming => return error.NoDevice, + else => |e| return e, + }; + if (!S.ISCHR(st.mode)) { + return error.NoDevice; + } + }, } var i: usize = 0; diff --git a/lib/std/posix/test.zig b/lib/std/posix/test.zig index 64845d15ee..6fdb980b1b 100644 --- a/lib/std/posix/test.zig +++ b/lib/std/posix/test.zig @@ -364,9 +364,10 @@ test "getrlimit and setrlimit" { } test "sigrtmin/max" { - if (native_os == .wasi or native_os == .windows or native_os.isDarwin() or native_os == .openbsd) { - return error.SkipZigTest; - } + if (native_os.isDarwin() or switch (native_os) { + .wasi, .windows, .openbsd, .dragonfly => true, + else => false, + }) return error.SkipZigTest; try expect(posix.sigrtmin() >= 32); try expect(posix.sigrtmin() >= posix.system.sigrtmin()); @@ -397,7 +398,7 @@ fn reserved_signo(i: usize) bool { if (!builtin.link_libc) return false; const max = if (native_os == .netbsd) 32 else 31; if (i > max) return true; - if (native_os == .openbsd) return false; // no RT signals + if (native_os == .openbsd or native_os == .dragonfly) return false; // no RT signals return i < posix.sigrtmin(); } diff --git a/lib/std/zig.zig b/lib/std/zig.zig index 6212264005..abc213ba27 100644 --- a/lib/std/zig.zig +++ b/lib/std/zig.zig @@ -743,6 +743,7 @@ pub const EnvVar = enum { NO_COLOR, CLICOLOR_FORCE, XDG_CACHE_HOME, + LOCALAPPDATA, HOME, pub fn isSet(comptime ev: EnvVar) bool { diff --git a/lib/std/zig/WindowsSdk.zig b/lib/std/zig/WindowsSdk.zig index b0f24c2aca..1b172e4358 100644 --- a/lib/std/zig/WindowsSdk.zig +++ b/lib/std/zig/WindowsSdk.zig @@ -860,7 +860,14 @@ const MsvcLibDir = struct { // %localappdata%\Microsoft\VisualStudio\ // %appdata%\Local\Microsoft\VisualStudio\ - const visualstudio_folder_path = std.fs.getAppDataDir(gpa, "Microsoft\\VisualStudio\\") catch return error.PathNotFound; + const local_app_data_path = (std.zig.EnvVar.LOCALAPPDATA.get(gpa) catch |err| switch (err) { + error.OutOfMemory => |e| return e, + error.InvalidWtf8 => return error.PathNotFound, + }) orelse return error.PathNotFound; + defer gpa.free(local_app_data_path); + const visualstudio_folder_path = try Dir.path.join(gpa, &.{ + local_app_data_path, "Microsoft\\VisualStudio\\", + }); defer gpa.free(visualstudio_folder_path); const vs_versions: []const []const u8 = vs_versions: { |
