diff options
| author | Andrew Kelley <andrew@ziglang.org> | 2025-10-16 23:40:32 -0700 |
|---|---|---|
| committer | Andrew Kelley <andrew@ziglang.org> | 2025-10-29 06:20:50 -0700 |
| commit | cf6fa219fd05b9f2c01e85557bcd140e72802459 (patch) | |
| tree | 7384deba8832c91ae257180835cc3c957912927f /lib/std | |
| parent | d4215ffaa04b976400bd597cca0cca8182068bf6 (diff) | |
| download | zig-cf6fa219fd05b9f2c01e85557bcd140e72802459.tar.gz zig-cf6fa219fd05b9f2c01e85557bcd140e72802459.zip | |
std.Io.Threaded: fix netWrite cancellation
Move std.posix logic over rather than calling into it.
Diffstat (limited to 'lib/std')
| -rw-r--r-- | lib/std/Io/Threaded.zig | 86 | ||||
| -rw-r--r-- | lib/std/Io/net.zig | 33 |
2 files changed, 87 insertions, 32 deletions
diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig index 59f7d4270c..cdaf06a7c0 100644 --- a/lib/std/Io/Threaded.zig +++ b/lib/std/Io/Threaded.zig @@ -250,6 +250,19 @@ pub fn io(t: *Threaded) Io { }; } +const socket_flags_unsupported = builtin.os.tag.isDarwin() or native_os == .haiku; // 💩💩 +const have_accept4 = !socket_flags_unsupported; +const have_flock_open_flags = @hasField(posix.O, "EXLOCK"); +const have_networking = builtin.os.tag != .wasi; +const have_flock = @TypeOf(posix.system.flock) != void; +const have_sendmmsg = builtin.os.tag == .linux; + +const openat_sym = if (posix.lfs64_abi) posix.system.openat64 else posix.system.openat; +const fstat_sym = if (posix.lfs64_abi) posix.system.fstat64 else posix.system.fstat; +const fstatat_sym = if (posix.lfs64_abi) posix.system.fstatat64 else posix.system.fstatat; +const lseek_sym = if (posix.lfs64_abi) posix.system.lseek64 else posix.system.lseek; +const preadv_sym = if (posix.lfs64_abi) posix.system.preadv64 else posix.system.preadv; + /// Trailing data: /// 1. context /// 2. result @@ -1143,13 +1156,6 @@ fn fileStatWasi(userdata: ?*anyopaque, file: Io.File) Io.File.StatError!Io.File. } } -const have_flock = @TypeOf(posix.system.flock) != void; -const openat_sym = if (posix.lfs64_abi) posix.system.openat64 else posix.system.openat; -const fstat_sym = if (posix.lfs64_abi) posix.system.fstat64 else posix.system.fstat; -const fstatat_sym = if (posix.lfs64_abi) posix.system.fstatat64 else posix.system.fstatat; -const lseek_sym = if (posix.lfs64_abi) posix.system.lseek64 else posix.system.lseek; -const preadv_sym = if (posix.lfs64_abi) posix.system.preadv64 else posix.system.preadv; - fn dirAccessPosix( userdata: ?*anyopaque, dir: Io.Dir, @@ -1277,8 +1283,7 @@ fn dirCreateFilePosix( // Use the O locking flags if the os supports them to acquire the lock // atomically. Note that the NONBLOCK flag is removed after the openat() // call is successful. - const has_flock_open_flags = @hasField(posix.O, "EXLOCK"); - if (has_flock_open_flags) switch (flags.lock) { + if (have_flock_open_flags) switch (flags.lock) { .none => {}, .shared => { os_flags.SHLOCK = true; @@ -1328,7 +1333,7 @@ fn dirCreateFilePosix( }; errdefer posix.close(fd); - if (have_flock and !has_flock_open_flags and flags.lock != .none) { + if (have_flock and !have_flock_open_flags and flags.lock != .none) { const lock_nonblocking: i32 = if (flags.lock_nonblocking) posix.LOCK.NB else 0; const lock_flags = switch (flags.lock) { .none => unreachable, @@ -1352,7 +1357,7 @@ fn dirCreateFilePosix( } } - if (has_flock_open_flags and flags.lock_nonblocking) { + if (have_flock_open_flags and flags.lock_nonblocking) { var fl_flags: usize = while (true) { try t.checkCancel(); const rc = posix.system.fcntl(fd, posix.F.GETFL, @as(usize, 0)); @@ -1476,8 +1481,7 @@ fn dirOpenFile( // Use the O locking flags if the os supports them to acquire the lock // atomically. - const has_flock_open_flags = @hasField(posix.O, "EXLOCK"); - if (has_flock_open_flags) { + if (have_flock_open_flags) { // Note that the NONBLOCK flag is removed after the openat() call // is successful. switch (flags.lock) { @@ -1530,7 +1534,7 @@ fn dirOpenFile( }; errdefer posix.close(fd); - if (have_flock and !has_flock_open_flags and flags.lock != .none) { + if (have_flock and !have_flock_open_flags and flags.lock != .none) { const lock_nonblocking: i32 = if (flags.lock_nonblocking) posix.LOCK.NB else 0; const lock_flags = switch (flags.lock) { .none => unreachable, @@ -1554,7 +1558,7 @@ fn dirOpenFile( } } - if (has_flock_open_flags and flags.lock_nonblocking) { + if (have_flock_open_flags and flags.lock_nonblocking) { var fl_flags: usize = while (true) { try t.checkCancel(); const rc = posix.system.fcntl(fd, posix.F.GETFL, @as(usize, 0)); @@ -1954,7 +1958,7 @@ fn nowWasi(userdata: ?*anyopaque, clock: Io.Clock) Io.Clock.Error!Io.Timestamp { var ns: std.os.wasi.timestamp_t = undefined; const err = std.os.wasi.clock_time_get(clockToWasi(clock), 1, &ns); if (err != .SUCCESS) return error.Unexpected; - return ns; + return .fromNanoseconds(ns); } fn sleepLinux(userdata: ?*anyopaque, timeout: Io.Timeout) Io.SleepError!void { @@ -2004,7 +2008,7 @@ fn sleepWasi(userdata: ?*anyopaque, timeout: Io.Timeout) Io.SleepError!void { const clock: w.subscription_clock_t = if (try timeout.toDurationFromNow(t.io())) |d| .{ .id = clockToWasi(d.clock), - .timeout = std.math.lossyCast(u64, d.duration.nanoseconds), + .timeout = std.math.lossyCast(u64, d.raw.nanoseconds), .precision = 0, .flags = 0, } else .{ @@ -2083,6 +2087,7 @@ fn netListenIpPosix( address: IpAddress, options: IpAddress.ListenOptions, ) IpAddress.ListenError!net.Server { + if (!have_networking) return error.NetworkDown; const t: *Threaded = @ptrCast(@alignCast(userdata)); const family = posixAddressFamily(&address); const socket_fd = try openSocketPosix(t, family, .{ @@ -2230,6 +2235,7 @@ fn posixConnect(t: *Threaded, socket_fd: posix.socket_t, addr: *const posix.sock .ACCES => return error.AccessDenied, .PERM => |err| return errnoBug(err), .NOENT => |err| return errnoBug(err), + .NETDOWN => return error.NetworkDown, else => |err| return posix.unexpectedErrno(err), } } @@ -2306,6 +2312,7 @@ fn netConnectIpPosix( address: *const IpAddress, options: IpAddress.ConnectOptions, ) IpAddress.ConnectError!net.Stream { + if (!have_networking) return error.NetworkDown; if (options.timeout != .none) @panic("TODO"); const t: *Threaded = @ptrCast(@alignCast(userdata)); const family = posixAddressFamily(address); @@ -2346,6 +2353,7 @@ fn netBindIpPosix( address: *const IpAddress, options: IpAddress.BindOptions, ) IpAddress.BindError!net.Socket { + if (!have_networking) return error.NetworkDown; const t: *Threaded = @ptrCast(@alignCast(userdata)); const family = posixAddressFamily(address); const socket_fd = try openSocketPosix(t, family, options); @@ -2421,9 +2429,6 @@ fn openSocketPosix( return socket_fd; } -const socket_flags_unsupported = builtin.os.tag.isDarwin() or native_os == .haiku; // 💩💩 -const have_accept4 = !socket_flags_unsupported; - fn netAcceptPosix(userdata: ?*anyopaque, listen_fd: net.Socket.Handle) net.Server.AcceptError!net.Stream { const t: *Threaded = @ptrCast(@alignCast(userdata)); var storage: PosixAddress = undefined; @@ -2534,14 +2539,13 @@ fn netReadPosix(userdata: ?*anyopaque, fd: net.Socket.Handle, data: [][]u8) net. } } -const have_sendmmsg = builtin.os.tag == .linux; - fn netSend( userdata: ?*anyopaque, handle: net.Socket.Handle, messages: []net.OutgoingMessage, flags: net.SendFlags, ) struct { ?net.Socket.SendError, usize } { + if (!have_networking) return .{ error.NetworkDown, 0 }; const t: *Threaded = @ptrCast(@alignCast(userdata)); const posix_flags: u32 = @@ -2703,7 +2707,7 @@ fn netSendMany( .OPNOTSUPP => |err| return errnoBug(err), // Some bit in the flags argument is inappropriate for the socket type. .PIPE => return error.SocketUnconnected, .AFNOSUPPORT => return error.AddressFamilyUnsupported, - .HOSTUNREACH => return error.NetworkUnreachable, + .HOSTUNREACH => return error.HostUnreachable, .NETUNREACH => return error.NetworkUnreachable, .NOTCONN => return error.SocketUnconnected, .NETDOWN => return error.NetworkDown, @@ -2720,6 +2724,7 @@ fn netReceive( flags: net.ReceiveFlags, timeout: Io.Timeout, ) struct { ?net.Socket.ReceiveTimeoutError, usize } { + if (!have_networking) return .{ error.NetworkDown, 0 }; const t: *Threaded = @ptrCast(@alignCast(userdata)); // recvmmsg is useless, here's why: @@ -2847,8 +2852,8 @@ fn netWritePosix( data: []const []const u8, splat: usize, ) net.Stream.Writer.Error!usize { + if (!have_networking) return error.NetworkDown; const t: *Threaded = @ptrCast(@alignCast(userdata)); - try t.checkCancel(); var iovecs: [max_iovecs_len]posix.iovec_const = undefined; var msg: posix.msghdr_const = .{ @@ -2889,7 +2894,37 @@ fn netWritePosix( }, }; const flags = posix.MSG.NOSIGNAL; - return posix.sendmsg(fd, &msg, flags); + while (true) { + try t.checkCancel(); + const rc = posix.system.sendmsg(fd, &msg, flags); + switch (posix.errno(rc)) { + .SUCCESS => return @intCast(rc), + .INTR => continue, + .CANCELED => return error.Canceled, + + .ACCES => |err| return errnoBug(err), + .AGAIN => |err| return errnoBug(err), + .ALREADY => return error.FastOpenAlreadyInProgress, + .BADF => |err| return errnoBug(err), // always a race condition + .CONNRESET => return error.ConnectionResetByPeer, + .DESTADDRREQ => |err| return errnoBug(err), // The socket is not connection-mode, and no peer address is set. + .FAULT => |err| return errnoBug(err), // An invalid user space address was specified for an argument. + .INVAL => |err| return errnoBug(err), // Invalid argument passed. + .ISCONN => |err| return errnoBug(err), // connection-mode socket was connected already but a recipient was specified + .MSGSIZE => |err| return errnoBug(err), + .NOBUFS => return error.SystemResources, + .NOMEM => return error.SystemResources, + .NOTSOCK => |err| return errnoBug(err), // The file descriptor sockfd does not refer to a socket. + .OPNOTSUPP => |err| return errnoBug(err), // Some bit in the flags argument is inappropriate for the socket type. + .PIPE => return error.SocketUnconnected, + .AFNOSUPPORT => return error.AddressFamilyUnsupported, + .HOSTUNREACH => return error.HostUnreachable, + .NETUNREACH => return error.NetworkUnreachable, + .NOTCONN => return error.SocketUnconnected, + .NETDOWN => return error.NetworkDown, + else => |err| return posix.unexpectedErrno(err), + } + } } fn addBuf(v: []posix.iovec_const, i: *@FieldType(posix.msghdr_const, "iovlen"), bytes: []const u8) void { @@ -2913,6 +2948,7 @@ fn netInterfaceNameResolve( userdata: ?*anyopaque, name: *const net.Interface.Name, ) net.Interface.Name.ResolveError!net.Interface { + if (!have_networking) return error.InterfaceNotFound; const t: *Threaded = @ptrCast(@alignCast(userdata)); if (native_os == .linux) { diff --git a/lib/std/Io/net.zig b/lib/std/Io/net.zig index 1f47d7e1f5..ca18325e2a 100644 --- a/lib/std/Io/net.zig +++ b/lib/std/Io/net.zig @@ -309,6 +309,7 @@ pub const IpAddress = union(enum) { AccessDenied, /// Non-blocking was requested and the operation cannot return immediately. WouldBlock, + NetworkDown, } || Io.Timeout.Error || Io.UnexpectedError || Io.Cancelable; pub const ConnectOptions = struct { @@ -1062,7 +1063,7 @@ pub const Socket = struct { AddressFamilyUnsupported, /// Another TCP Fast Open is already in progress. FastOpenAlreadyInProgress, - /// Network connection was unexpectedly closed by recipient. + /// Network session was unexpectedly closed by recipient. ConnectionResetByPeer, /// Local end has been shut down on a connection-oriented socket, or /// the socket was never connected. @@ -1242,15 +1243,33 @@ pub const Stream = struct { stream: Stream, err: ?Error = null, - pub const Error = std.posix.SendMsgError || error{ + pub const Error = error{ + /// Another TCP Fast Open is already in progress. + FastOpenAlreadyInProgress, + /// Network session was unexpectedly closed by recipient. ConnectionResetByPeer, - SocketNotBound, - MessageOversize, - NetworkDown, + /// The output queue for a network interface was full. This generally indicates that the + /// interface has stopped sending, but may be caused by transient congestion. (Normally, + /// this does not occur in Linux. Packets are just silently dropped when a device queue + /// overflows.) + /// + /// This is also caused when there is not enough kernel memory available. SystemResources, + /// No route to network. + NetworkUnreachable, + /// Network reached but no route to host. + HostUnreachable, + /// The local network interface used to reach the destination is down. + NetworkDown, + /// The destination address is not listening. + ConnectionRefused, + /// The passed address didn't have the correct address family in its sa_family field. + AddressFamilyUnsupported, + /// Local end has been shut down on a connection-oriented socket, or + /// the socket was never connected. SocketUnconnected, - Unexpected, - } || Io.Cancelable; + SocketNotBound, + } || Io.UnexpectedError || Io.Cancelable; pub fn init(stream: Stream, io: Io, buffer: []u8) Writer { return .{ |
