diff options
| author | Andrew Kelley <andrew@ziglang.org> | 2025-10-22 13:21:13 -0700 |
|---|---|---|
| committer | Andrew Kelley <andrew@ziglang.org> | 2025-10-29 06:20:51 -0700 |
| commit | df84dc18bc2fa18770591be4d1e0cabc8d4b28c2 (patch) | |
| tree | 2be3888eb740771dc39593a07b032512a3ecab86 /lib/std/Io/Kqueue.zig | |
| parent | d6b0686b055070f390bc3465dbcf678bb590ecae (diff) | |
| download | zig-df84dc18bc2fa18770591be4d1e0cabc8d4b28c2.tar.gz zig-df84dc18bc2fa18770591be4d1e0cabc8d4b28c2.zip | |
add bind
Diffstat (limited to 'lib/std/Io/Kqueue.zig')
| -rw-r--r-- | lib/std/Io/Kqueue.zig | 203 |
1 files changed, 187 insertions, 16 deletions
diff --git a/lib/std/Io/Kqueue.zig b/lib/std/Io/Kqueue.zig index edfa76cd5e..eef959155a 100644 --- a/lib/std/Io/Kqueue.zig +++ b/lib/std/Io/Kqueue.zig @@ -9,6 +9,9 @@ const net = std.Io.net; const assert = std.debug.assert; const Allocator = std.mem.Allocator; const Alignment = std.mem.Alignment; +const posix = std.posix; +const IpAddress = std.Io.net.IpAddress; +const errnoBug = std.Io.Threaded.errnoBug; /// Must be a thread-safe allocator. gpa: Allocator, @@ -97,14 +100,10 @@ fn async( context_alignment: std.mem.Alignment, start: *const fn (context: *const anyopaque, result: *anyopaque) void, ) ?*Io.AnyFuture { - const k: *Kqueue = @ptrCast(@alignCast(userdata)); - _ = k; - _ = result; - _ = result_alignment; - _ = context; - _ = context_alignment; - _ = start; - @panic("TODO"); + return concurrent(userdata, result.len, result_alignment, context, context_alignment, start) catch { + start(context.ptr, result.ptr); + return null; + }; } fn concurrent( @@ -156,7 +155,7 @@ fn cancel( fn cancelRequested(userdata: ?*anyopaque) bool { const k: *Kqueue = @ptrCast(@alignCast(userdata)); _ = k; - @panic("TODO"); + return false; // TODO } fn groupAsync( @@ -419,12 +418,23 @@ fn netAccept(userdata: ?*anyopaque, server: net.Socket.Handle) net.Server.Accept _ = server; @panic("TODO"); } -fn netBindIp(userdata: ?*anyopaque, address: *const net.IpAddress, options: net.IpAddress.BindOptions) net.IpAddress.BindError!net.Socket { - const k: *Kqueue = @ptrCast(@alignCast(userdata)); - _ = k; - _ = address; - _ = options; - @panic("TODO"); +fn netBindIp( + userdata: ?*anyopaque, + address: *const net.IpAddress, + options: net.IpAddress.BindOptions, +) net.IpAddress.BindError!net.Socket { + const k: *Kqueue = @ptrCast(@alignCast(userdata)); + const family = Io.Threaded.posixAddressFamily(address); + const socket_fd = try openSocketPosix(k, family, options); + errdefer posix.close(socket_fd); + var storage: Io.Threaded.PosixAddress = undefined; + var addr_len = Io.Threaded.addressToPosix(address, &storage); + try posixBind(k, socket_fd, &storage.any, addr_len); + try posixGetSockName(k, socket_fd, &storage.any, &addr_len); + return .{ + .handle = socket_fd, + .address = Io.Threaded.addressFromPosix(&storage), + }; } fn netConnectIp(userdata: ?*anyopaque, address: *const net.IpAddress, options: net.IpAddress.ConnectOptions) net.IpAddress.ConnectError!net.Stream { const k: *Kqueue = @ptrCast(@alignCast(userdata)); @@ -453,6 +463,7 @@ fn netConnectUnix( _ = unix_address; @panic("TODO"); } + fn netSend( userdata: ?*anyopaque, handle: net.Socket.Handle, @@ -460,12 +471,22 @@ fn netSend( flags: net.SendFlags, ) struct { ?net.Socket.SendError, usize } { const k: *Kqueue = @ptrCast(@alignCast(userdata)); + + const posix_flags: u32 = + @as(u32, if (@hasDecl(posix.MSG, "CONFIRM") and flags.confirm) posix.MSG.CONFIRM else 0) | + @as(u32, if (@hasDecl(posix.MSG, "DONTROUTE") and flags.dont_route) posix.MSG.DONTROUTE else 0) | + @as(u32, if (@hasDecl(posix.MSG, "EOR") and flags.eor) posix.MSG.EOR else 0) | + @as(u32, if (@hasDecl(posix.MSG, "OOB") and flags.oob) posix.MSG.OOB else 0) | + @as(u32, if (@hasDecl(posix.MSG, "FASTOPEN") and flags.fastopen) posix.MSG.FASTOPEN else 0) | + posix.MSG.NOSIGNAL; + _ = k; + _ = posix_flags; _ = handle; _ = outgoing_messages; - _ = flags; @panic("TODO"); } + fn netReceive( userdata: ?*anyopaque, handle: net.Socket.Handle, @@ -533,3 +554,153 @@ fn netLookup( _ = options; @panic("TODO"); } + +fn openSocketPosix( + k: *Kqueue, + family: posix.sa_family_t, + options: IpAddress.BindOptions, +) error{ + AddressFamilyUnsupported, + ProtocolUnsupportedBySystem, + ProcessFdQuotaExceeded, + SystemFdQuotaExceeded, + SystemResources, + ProtocolUnsupportedByAddressFamily, + SocketModeUnsupported, + OptionUnsupported, + Unexpected, + Canceled, +}!posix.socket_t { + const mode = Io.Threaded.posixSocketMode(options.mode); + const protocol = Io.Threaded.posixProtocol(options.protocol); + const socket_fd = while (true) { + try k.checkCancel(); + const flags: u32 = mode | if (Io.Threaded.socket_flags_unsupported) 0 else posix.SOCK.CLOEXEC; + const socket_rc = posix.system.socket(family, flags, protocol); + switch (posix.errno(socket_rc)) { + .SUCCESS => { + const fd: posix.fd_t = @intCast(socket_rc); + errdefer posix.close(fd); + if (Io.Threaded.socket_flags_unsupported) { + while (true) { + try k.checkCancel(); + switch (posix.errno(posix.system.fcntl(fd, posix.F.SETFD, @as(usize, posix.FD_CLOEXEC)))) { + .SUCCESS => break, + .INTR => continue, + .CANCELED => return error.Canceled, + else => |err| return posix.unexpectedErrno(err), + } + } + + var fl_flags: usize = while (true) { + try k.checkCancel(); + const rc = posix.system.fcntl(fd, posix.F.GETFL, @as(usize, 0)); + switch (posix.errno(rc)) { + .SUCCESS => break @intCast(rc), + .INTR => continue, + .CANCELED => return error.Canceled, + else => |err| return posix.unexpectedErrno(err), + } + }; + fl_flags &= ~@as(usize, 1 << @bitOffsetOf(posix.O, "NONBLOCK")); + while (true) { + try k.checkCancel(); + switch (posix.errno(posix.system.fcntl(fd, posix.F.SETFL, fl_flags))) { + .SUCCESS => break, + .INTR => continue, + .CANCELED => return error.Canceled, + else => |err| return posix.unexpectedErrno(err), + } + } + } + break fd; + }, + .INTR => continue, + .CANCELED => return error.Canceled, + + .AFNOSUPPORT => return error.AddressFamilyUnsupported, + .INVAL => return error.ProtocolUnsupportedBySystem, + .MFILE => return error.ProcessFdQuotaExceeded, + .NFILE => return error.SystemFdQuotaExceeded, + .NOBUFS => return error.SystemResources, + .NOMEM => return error.SystemResources, + .PROTONOSUPPORT => return error.ProtocolUnsupportedByAddressFamily, + .PROTOTYPE => return error.SocketModeUnsupported, + else => |err| return posix.unexpectedErrno(err), + } + }; + errdefer posix.close(socket_fd); + + if (options.ip6_only) { + if (posix.IPV6 == void) return error.OptionUnsupported; + try setSocketOption(k, socket_fd, posix.IPPROTO.IPV6, posix.IPV6.V6ONLY, 0); + } + + return socket_fd; +} + +fn posixBind( + k: *Kqueue, + socket_fd: posix.socket_t, + addr: *const posix.sockaddr, + addr_len: posix.socklen_t, +) !void { + while (true) { + try k.checkCancel(); + switch (posix.errno(posix.system.bind(socket_fd, addr, addr_len))) { + .SUCCESS => break, + .INTR => continue, + .CANCELED => return error.Canceled, + + .ADDRINUSE => return error.AddressInUse, + .BADF => |err| return errnoBug(err), // File descriptor used after closed. + .INVAL => |err| return errnoBug(err), // invalid parameters + .NOTSOCK => |err| return errnoBug(err), // invalid `sockfd` + .AFNOSUPPORT => return error.AddressFamilyUnsupported, + .ADDRNOTAVAIL => return error.AddressUnavailable, + .FAULT => |err| return errnoBug(err), // invalid `addr` pointer + .NOMEM => return error.SystemResources, + else => |err| return posix.unexpectedErrno(err), + } + } +} + +fn posixGetSockName(k: *Kqueue, socket_fd: posix.fd_t, addr: *posix.sockaddr, addr_len: *posix.socklen_t) !void { + while (true) { + try k.checkCancel(); + switch (posix.errno(posix.system.getsockname(socket_fd, addr, addr_len))) { + .SUCCESS => break, + .INTR => continue, + .CANCELED => return error.Canceled, + + .BADF => |err| return errnoBug(err), // File descriptor used after closed. + .FAULT => |err| return errnoBug(err), + .INVAL => |err| return errnoBug(err), // invalid parameters + .NOTSOCK => |err| return errnoBug(err), // always a race condition + .NOBUFS => return error.SystemResources, + else => |err| return posix.unexpectedErrno(err), + } + } +} + +fn setSocketOption(k: *Kqueue, fd: posix.fd_t, level: i32, opt_name: u32, option: u32) !void { + const o: []const u8 = @ptrCast(&option); + while (true) { + try k.checkCancel(); + switch (posix.errno(posix.system.setsockopt(fd, level, opt_name, o.ptr, @intCast(o.len)))) { + .SUCCESS => return, + .INTR => continue, + .CANCELED => return error.Canceled, + + .BADF => |err| return errnoBug(err), // File descriptor used after closed. + .NOTSOCK => |err| return errnoBug(err), + .INVAL => |err| return errnoBug(err), + .FAULT => |err| return errnoBug(err), + else => |err| return posix.unexpectedErrno(err), + } + } +} + +fn checkCancel(k: *Kqueue) error{Canceled}!void { + if (cancelRequested(k)) return error.Canceled; +} |
