aboutsummaryrefslogtreecommitdiff
path: root/lib/std/Io/Kqueue.zig
diff options
context:
space:
mode:
authorAndrew Kelley <andrew@ziglang.org>2025-10-22 13:21:13 -0700
committerAndrew Kelley <andrew@ziglang.org>2025-10-29 06:20:51 -0700
commitdf84dc18bc2fa18770591be4d1e0cabc8d4b28c2 (patch)
tree2be3888eb740771dc39593a07b032512a3ecab86 /lib/std/Io/Kqueue.zig
parentd6b0686b055070f390bc3465dbcf678bb590ecae (diff)
downloadzig-df84dc18bc2fa18770591be4d1e0cabc8d4b28c2.tar.gz
zig-df84dc18bc2fa18770591be4d1e0cabc8d4b28c2.zip
add bind
Diffstat (limited to 'lib/std/Io/Kqueue.zig')
-rw-r--r--lib/std/Io/Kqueue.zig203
1 files changed, 187 insertions, 16 deletions
diff --git a/lib/std/Io/Kqueue.zig b/lib/std/Io/Kqueue.zig
index edfa76cd5e..eef959155a 100644
--- a/lib/std/Io/Kqueue.zig
+++ b/lib/std/Io/Kqueue.zig
@@ -9,6 +9,9 @@ const net = std.Io.net;
const assert = std.debug.assert;
const Allocator = std.mem.Allocator;
const Alignment = std.mem.Alignment;
+const posix = std.posix;
+const IpAddress = std.Io.net.IpAddress;
+const errnoBug = std.Io.Threaded.errnoBug;
/// Must be a thread-safe allocator.
gpa: Allocator,
@@ -97,14 +100,10 @@ fn async(
context_alignment: std.mem.Alignment,
start: *const fn (context: *const anyopaque, result: *anyopaque) void,
) ?*Io.AnyFuture {
- const k: *Kqueue = @ptrCast(@alignCast(userdata));
- _ = k;
- _ = result;
- _ = result_alignment;
- _ = context;
- _ = context_alignment;
- _ = start;
- @panic("TODO");
+ return concurrent(userdata, result.len, result_alignment, context, context_alignment, start) catch {
+ start(context.ptr, result.ptr);
+ return null;
+ };
}
fn concurrent(
@@ -156,7 +155,7 @@ fn cancel(
fn cancelRequested(userdata: ?*anyopaque) bool {
const k: *Kqueue = @ptrCast(@alignCast(userdata));
_ = k;
- @panic("TODO");
+ return false; // TODO
}
fn groupAsync(
@@ -419,12 +418,23 @@ fn netAccept(userdata: ?*anyopaque, server: net.Socket.Handle) net.Server.Accept
_ = server;
@panic("TODO");
}
-fn netBindIp(userdata: ?*anyopaque, address: *const net.IpAddress, options: net.IpAddress.BindOptions) net.IpAddress.BindError!net.Socket {
- const k: *Kqueue = @ptrCast(@alignCast(userdata));
- _ = k;
- _ = address;
- _ = options;
- @panic("TODO");
+fn netBindIp(
+ userdata: ?*anyopaque,
+ address: *const net.IpAddress,
+ options: net.IpAddress.BindOptions,
+) net.IpAddress.BindError!net.Socket {
+ const k: *Kqueue = @ptrCast(@alignCast(userdata));
+ const family = Io.Threaded.posixAddressFamily(address);
+ const socket_fd = try openSocketPosix(k, family, options);
+ errdefer posix.close(socket_fd);
+ var storage: Io.Threaded.PosixAddress = undefined;
+ var addr_len = Io.Threaded.addressToPosix(address, &storage);
+ try posixBind(k, socket_fd, &storage.any, addr_len);
+ try posixGetSockName(k, socket_fd, &storage.any, &addr_len);
+ return .{
+ .handle = socket_fd,
+ .address = Io.Threaded.addressFromPosix(&storage),
+ };
}
fn netConnectIp(userdata: ?*anyopaque, address: *const net.IpAddress, options: net.IpAddress.ConnectOptions) net.IpAddress.ConnectError!net.Stream {
const k: *Kqueue = @ptrCast(@alignCast(userdata));
@@ -453,6 +463,7 @@ fn netConnectUnix(
_ = unix_address;
@panic("TODO");
}
+
fn netSend(
userdata: ?*anyopaque,
handle: net.Socket.Handle,
@@ -460,12 +471,22 @@ fn netSend(
flags: net.SendFlags,
) struct { ?net.Socket.SendError, usize } {
const k: *Kqueue = @ptrCast(@alignCast(userdata));
+
+ const posix_flags: u32 =
+ @as(u32, if (@hasDecl(posix.MSG, "CONFIRM") and flags.confirm) posix.MSG.CONFIRM else 0) |
+ @as(u32, if (@hasDecl(posix.MSG, "DONTROUTE") and flags.dont_route) posix.MSG.DONTROUTE else 0) |
+ @as(u32, if (@hasDecl(posix.MSG, "EOR") and flags.eor) posix.MSG.EOR else 0) |
+ @as(u32, if (@hasDecl(posix.MSG, "OOB") and flags.oob) posix.MSG.OOB else 0) |
+ @as(u32, if (@hasDecl(posix.MSG, "FASTOPEN") and flags.fastopen) posix.MSG.FASTOPEN else 0) |
+ posix.MSG.NOSIGNAL;
+
_ = k;
+ _ = posix_flags;
_ = handle;
_ = outgoing_messages;
- _ = flags;
@panic("TODO");
}
+
fn netReceive(
userdata: ?*anyopaque,
handle: net.Socket.Handle,
@@ -533,3 +554,153 @@ fn netLookup(
_ = options;
@panic("TODO");
}
+
+fn openSocketPosix(
+ k: *Kqueue,
+ family: posix.sa_family_t,
+ options: IpAddress.BindOptions,
+) error{
+ AddressFamilyUnsupported,
+ ProtocolUnsupportedBySystem,
+ ProcessFdQuotaExceeded,
+ SystemFdQuotaExceeded,
+ SystemResources,
+ ProtocolUnsupportedByAddressFamily,
+ SocketModeUnsupported,
+ OptionUnsupported,
+ Unexpected,
+ Canceled,
+}!posix.socket_t {
+ const mode = Io.Threaded.posixSocketMode(options.mode);
+ const protocol = Io.Threaded.posixProtocol(options.protocol);
+ const socket_fd = while (true) {
+ try k.checkCancel();
+ const flags: u32 = mode | if (Io.Threaded.socket_flags_unsupported) 0 else posix.SOCK.CLOEXEC;
+ const socket_rc = posix.system.socket(family, flags, protocol);
+ switch (posix.errno(socket_rc)) {
+ .SUCCESS => {
+ const fd: posix.fd_t = @intCast(socket_rc);
+ errdefer posix.close(fd);
+ if (Io.Threaded.socket_flags_unsupported) {
+ while (true) {
+ try k.checkCancel();
+ switch (posix.errno(posix.system.fcntl(fd, posix.F.SETFD, @as(usize, posix.FD_CLOEXEC)))) {
+ .SUCCESS => break,
+ .INTR => continue,
+ .CANCELED => return error.Canceled,
+ else => |err| return posix.unexpectedErrno(err),
+ }
+ }
+
+ var fl_flags: usize = while (true) {
+ try k.checkCancel();
+ const rc = posix.system.fcntl(fd, posix.F.GETFL, @as(usize, 0));
+ switch (posix.errno(rc)) {
+ .SUCCESS => break @intCast(rc),
+ .INTR => continue,
+ .CANCELED => return error.Canceled,
+ else => |err| return posix.unexpectedErrno(err),
+ }
+ };
+ fl_flags &= ~@as(usize, 1 << @bitOffsetOf(posix.O, "NONBLOCK"));
+ while (true) {
+ try k.checkCancel();
+ switch (posix.errno(posix.system.fcntl(fd, posix.F.SETFL, fl_flags))) {
+ .SUCCESS => break,
+ .INTR => continue,
+ .CANCELED => return error.Canceled,
+ else => |err| return posix.unexpectedErrno(err),
+ }
+ }
+ }
+ break fd;
+ },
+ .INTR => continue,
+ .CANCELED => return error.Canceled,
+
+ .AFNOSUPPORT => return error.AddressFamilyUnsupported,
+ .INVAL => return error.ProtocolUnsupportedBySystem,
+ .MFILE => return error.ProcessFdQuotaExceeded,
+ .NFILE => return error.SystemFdQuotaExceeded,
+ .NOBUFS => return error.SystemResources,
+ .NOMEM => return error.SystemResources,
+ .PROTONOSUPPORT => return error.ProtocolUnsupportedByAddressFamily,
+ .PROTOTYPE => return error.SocketModeUnsupported,
+ else => |err| return posix.unexpectedErrno(err),
+ }
+ };
+ errdefer posix.close(socket_fd);
+
+ if (options.ip6_only) {
+ if (posix.IPV6 == void) return error.OptionUnsupported;
+ try setSocketOption(k, socket_fd, posix.IPPROTO.IPV6, posix.IPV6.V6ONLY, 0);
+ }
+
+ return socket_fd;
+}
+
+fn posixBind(
+ k: *Kqueue,
+ socket_fd: posix.socket_t,
+ addr: *const posix.sockaddr,
+ addr_len: posix.socklen_t,
+) !void {
+ while (true) {
+ try k.checkCancel();
+ switch (posix.errno(posix.system.bind(socket_fd, addr, addr_len))) {
+ .SUCCESS => break,
+ .INTR => continue,
+ .CANCELED => return error.Canceled,
+
+ .ADDRINUSE => return error.AddressInUse,
+ .BADF => |err| return errnoBug(err), // File descriptor used after closed.
+ .INVAL => |err| return errnoBug(err), // invalid parameters
+ .NOTSOCK => |err| return errnoBug(err), // invalid `sockfd`
+ .AFNOSUPPORT => return error.AddressFamilyUnsupported,
+ .ADDRNOTAVAIL => return error.AddressUnavailable,
+ .FAULT => |err| return errnoBug(err), // invalid `addr` pointer
+ .NOMEM => return error.SystemResources,
+ else => |err| return posix.unexpectedErrno(err),
+ }
+ }
+}
+
+fn posixGetSockName(k: *Kqueue, socket_fd: posix.fd_t, addr: *posix.sockaddr, addr_len: *posix.socklen_t) !void {
+ while (true) {
+ try k.checkCancel();
+ switch (posix.errno(posix.system.getsockname(socket_fd, addr, addr_len))) {
+ .SUCCESS => break,
+ .INTR => continue,
+ .CANCELED => return error.Canceled,
+
+ .BADF => |err| return errnoBug(err), // File descriptor used after closed.
+ .FAULT => |err| return errnoBug(err),
+ .INVAL => |err| return errnoBug(err), // invalid parameters
+ .NOTSOCK => |err| return errnoBug(err), // always a race condition
+ .NOBUFS => return error.SystemResources,
+ else => |err| return posix.unexpectedErrno(err),
+ }
+ }
+}
+
+fn setSocketOption(k: *Kqueue, fd: posix.fd_t, level: i32, opt_name: u32, option: u32) !void {
+ const o: []const u8 = @ptrCast(&option);
+ while (true) {
+ try k.checkCancel();
+ switch (posix.errno(posix.system.setsockopt(fd, level, opt_name, o.ptr, @intCast(o.len)))) {
+ .SUCCESS => return,
+ .INTR => continue,
+ .CANCELED => return error.Canceled,
+
+ .BADF => |err| return errnoBug(err), // File descriptor used after closed.
+ .NOTSOCK => |err| return errnoBug(err),
+ .INVAL => |err| return errnoBug(err),
+ .FAULT => |err| return errnoBug(err),
+ else => |err| return posix.unexpectedErrno(err),
+ }
+ }
+}
+
+fn checkCancel(k: *Kqueue) error{Canceled}!void {
+ if (cancelRequested(k)) return error.Canceled;
+}