From 0d22a00f6fde79f851a7d19c2096c07f541ed0be Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Wed, 7 Mar 2018 03:55:52 -0500 Subject: *WIP* async/await TCP server --- std/net.zig | 274 ++++++++++++++++++++++++------------------------------------ 1 file changed, 108 insertions(+), 166 deletions(-) (limited to 'std/net.zig') diff --git a/std/net.zig b/std/net.zig index 1140b6449b..595baae9dd 100644 --- a/std/net.zig +++ b/std/net.zig @@ -1,143 +1,103 @@ const std = @import("index.zig"); -const linux = std.os.linux; const assert = std.debug.assert; -const endian = std.endian; - -// TODO don't trust this file, it bit rotted. start over - -const Connection = struct { - socket_fd: i32, - - pub fn send(c: Connection, buf: []const u8) !usize { - const send_ret = linux.sendto(c.socket_fd, buf.ptr, buf.len, 0, null, 0); - const send_err = linux.getErrno(send_ret); - switch (send_err) { - 0 => return send_ret, - linux.EINVAL => unreachable, - linux.EFAULT => unreachable, - linux.ECONNRESET => return error.ConnectionReset, - linux.EINTR => return error.SigInterrupt, - // TODO there are more possible errors - else => return error.Unexpected, - } +const net = this; +const posix = std.os.posix; +const mem = std.mem; + +pub const Address = struct { + sockaddr: posix.sockaddr, + + pub fn initIp4(ip4: u32, port: u16) Address { + return Address { + .sockaddr = posix.sockaddr { + .in = posix.sockaddr_in { + .family = posix.AF_INET, + .port = std.mem.endianSwapIfLe(u16, port), + .addr = ip4, + .zero = []u8{0} ** 8, + }, + }, + }; } - pub fn recv(c: Connection, buf: []u8) ![]u8 { - const recv_ret = linux.recvfrom(c.socket_fd, buf.ptr, buf.len, 0, null, null); - const recv_err = linux.getErrno(recv_ret); - switch (recv_err) { - 0 => return buf[0..recv_ret], - linux.EINVAL => unreachable, - linux.EFAULT => unreachable, - linux.ENOTSOCK => return error.NotSocket, - linux.EINTR => return error.SigInterrupt, - linux.ENOMEM => return error.OutOfMemory, - linux.ECONNREFUSED => return error.ConnectionRefused, - linux.EBADF => return error.BadFd, - // TODO more error values - else => return error.Unexpected, - } + pub fn initIp6(ip6: &const Ip6Addr, port: u16) Address { + return Address { + .family = posix.AF_INET6, + .sockaddr = posix.sockaddr { + .in6 = posix.sockaddr_in6 { + .family = posix.AF_INET6, + .port = std.mem.endianSwapIfLe(u16, port), + .flowinfo = 0, + .addr = ip6.addr, + .scope_id = ip6.scope_id, + }, + }, + }; } - pub fn close(c: Connection) !void { - switch (linux.getErrno(linux.close(c.socket_fd))) { - 0 => return, - linux.EBADF => unreachable, - linux.EINTR => return error.SigInterrupt, - linux.EIO => return error.Io, - else => return error.Unexpected, + pub fn format(self: &const Address, out_stream: var) !void { + switch (self.sockaddr.in.family) { + posix.AF_INET => { + const native_endian_port = std.mem.endianSwapIfLe(u16, self.sockaddr.in.port); + const bytes = ([]const u8)((&self.sockaddr.in.addr)[0..1]); + try out_stream.print("{}.{}.{}.{}:{}", bytes[0], bytes[1], bytes[2], bytes[3], native_endian_port); + }, + posix.AF_INET6 => { + const native_endian_port = std.mem.endianSwapIfLe(u16, self.sockaddr.in6.port); + try out_stream.print("[TODO render ip6 address]:{}", native_endian_port); + }, + else => try out_stream.write("(unrecognized address family)"), } } }; -const Address = struct { - family: u16, - scope_id: u32, - addr: [16]u8, - sort_key: i32, -}; - -pub fn lookup(hostname: []const u8, out_addrs: []Address) ![]Address { - if (hostname.len == 0) { - - unreachable; // TODO - } - - unreachable; // TODO -} +pub fn parseIp4(buf: []const u8) !u32 { + var result: u32 = undefined; + const out_ptr = ([]u8)((&result)[0..1]); -pub fn connectAddr(addr: &Address, port: u16) !Connection { - const socket_ret = linux.socket(addr.family, linux.SOCK_STREAM, linux.PROTO_tcp); - const socket_err = linux.getErrno(socket_ret); - if (socket_err > 0) { - // TODO figure out possible errors from socket() - return error.Unexpected; + var x: u8 = 0; + var index: u8 = 0; + var saw_any_digits = false; + for (buf) |c| { + if (c == '.') { + if (!saw_any_digits) { + return error.InvalidCharacter; + } + if (index == 3) { + return error.InvalidEnd; + } + out_ptr[index] = x; + index += 1; + x = 0; + saw_any_digits = false; + } else if (c >= '0' and c <= '9') { + saw_any_digits = true; + const digit = c - '0'; + if (@mulWithOverflow(u8, x, 10, &x)) { + return error.Overflow; + } + if (@addWithOverflow(u8, x, digit, &x)) { + return error.Overflow; + } + } else { + return error.InvalidCharacter; + } } - const socket_fd = i32(socket_ret); - - const connect_ret = if (addr.family == linux.AF_INET) x: { - var os_addr: linux.sockaddr_in = undefined; - os_addr.family = addr.family; - os_addr.port = endian.swapIfLe(u16, port); - @memcpy((&u8)(&os_addr.addr), &addr.addr[0], 4); - @memset(&os_addr.zero[0], 0, @sizeOf(@typeOf(os_addr.zero))); - break :x linux.connect(socket_fd, (&linux.sockaddr)(&os_addr), @sizeOf(linux.sockaddr_in)); - } else if (addr.family == linux.AF_INET6) x: { - var os_addr: linux.sockaddr_in6 = undefined; - os_addr.family = addr.family; - os_addr.port = endian.swapIfLe(u16, port); - os_addr.flowinfo = 0; - os_addr.scope_id = addr.scope_id; - @memcpy(&os_addr.addr[0], &addr.addr[0], 16); - break :x linux.connect(socket_fd, (&linux.sockaddr)(&os_addr), @sizeOf(linux.sockaddr_in6)); - } else { - unreachable; - }; - const connect_err = linux.getErrno(connect_ret); - if (connect_err > 0) { - switch (connect_err) { - linux.ETIMEDOUT => return error.TimedOut, - else => { - // TODO figure out possible errors from connect() - return error.Unexpected; - }, - } + if (index == 3 and saw_any_digits) { + out_ptr[index] = x; + return result; } - return Connection { - .socket_fd = socket_fd, - }; -} - -pub fn connect(hostname: []const u8, port: u16) !Connection { - var addrs_buf: [1]Address = undefined; - const addrs_slice = try lookup(hostname, addrs_buf[0..]); - const main_addr = &addrs_slice[0]; - - return connectAddr(main_addr, port); -} - -pub fn parseIpLiteral(buf: []const u8) !Address { - - return error.InvalidIpLiteral; + return error.Incomplete; } -fn hexDigit(c: u8) u8 { - // TODO use switch with range - if ('0' <= c and c <= '9') { - return c - '0'; - } else if ('A' <= c and c <= 'Z') { - return c - 'A' + 10; - } else if ('a' <= c and c <= 'z') { - return c - 'a' + 10; - } else { - return @maxValue(u8); - } -} +pub const Ip6Addr = struct { + scope_id: u32, + addr: [16]u8, +}; -fn parseIp6(buf: []const u8) !Address { - var result: Address = undefined; - result.family = linux.AF_INET6; +pub fn parseIp6(buf: []const u8) !Ip6Addr { + var result: Ip6Addr = undefined; result.scope_id = 0; const ip_slice = result.addr[0..]; @@ -156,14 +116,14 @@ fn parseIp6(buf: []const u8) !Address { return error.Overflow; } } else { - return error.InvalidChar; + return error.InvalidCharacter; } } else if (c == ':') { if (!saw_any_digits) { - return error.InvalidChar; + return error.InvalidCharacter; } if (index == 14) { - return error.JunkAtEnd; + return error.InvalidEnd; } ip_slice[index] = @truncate(u8, x >> 8); index += 1; @@ -174,7 +134,7 @@ fn parseIp6(buf: []const u8) !Address { saw_any_digits = false; } else if (c == '%') { if (!saw_any_digits) { - return error.InvalidChar; + return error.InvalidCharacter; } if (index == 14) { ip_slice[index] = @truncate(u8, x >> 8); @@ -185,10 +145,7 @@ fn parseIp6(buf: []const u8) !Address { scope_id = true; saw_any_digits = false; } else { - const digit = hexDigit(c); - if (digit == @maxValue(u8)) { - return error.InvalidChar; - } + const digit = try std.fmt.charToDigit(c, 16); if (@mulWithOverflow(u16, x, 16, &x)) { return error.Overflow; } @@ -216,42 +173,27 @@ fn parseIp6(buf: []const u8) !Address { return error.Incomplete; } -fn parseIp4(buf: []const u8) !u32 { - var result: u32 = undefined; - const out_ptr = ([]u8)((&result)[0..1]); +test "std.net.parseIp4" { + assert((try parseIp4("127.0.0.1")) == std.mem.endianSwapIfLe(u32, 0x7f000001)); - var x: u8 = 0; - var index: u8 = 0; - var saw_any_digits = false; - for (buf) |c| { - if (c == '.') { - if (!saw_any_digits) { - return error.InvalidChar; - } - if (index == 3) { - return error.JunkAtEnd; - } - out_ptr[index] = x; - index += 1; - x = 0; - saw_any_digits = false; - } else if (c >= '0' and c <= '9') { - saw_any_digits = true; - const digit = c - '0'; - if (@mulWithOverflow(u8, x, 10, &x)) { - return error.Overflow; - } - if (@addWithOverflow(u8, x, digit, &x)) { - return error.Overflow; - } - } else { - return error.InvalidChar; - } - } - if (index == 3 and saw_any_digits) { - out_ptr[index] = x; - return result; + testParseIp4Fail("256.0.0.1", error.Overflow); + testParseIp4Fail("x.0.0.1", error.InvalidCharacter); + testParseIp4Fail("127.0.0.1.1", error.InvalidEnd); + testParseIp4Fail("127.0.0.", error.Incomplete); + testParseIp4Fail("100..0.1", error.InvalidCharacter); +} + +fn testParseIp4Fail(buf: []const u8, expected_err: error) void { + if (parseIp4(buf)) |_| { + @panic("expected error"); + } else |e| { + assert(e == expected_err); } +} - return error.Incomplete; +test "std.net.parseIp6" { + const addr = try parseIp6("FF01:0:0:0:0:0:0:FB"); + assert(addr.addr[0] == 0xff); + assert(addr.addr[1] == 0x01); + assert(addr.addr[2] == 0x00); } -- cgit v1.2.3 From cbda0fa78c37dd84821061309469c85a2281174c Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Sun, 8 Apr 2018 20:08:40 -0400 Subject: basic tcp server working when used with netcat --- std/event.zig | 22 ++++++++++++++-------- std/net.zig | 6 ++++++ std/os/index.zig | 24 ++++++++++++++++++++++++ 3 files changed, 44 insertions(+), 8 deletions(-) (limited to 'std/net.zig') diff --git a/std/event.zig b/std/event.zig index 7ede8e20d8..2ea5d03865 100644 --- a/std/event.zig +++ b/std/event.zig @@ -5,11 +5,12 @@ const mem = std.mem; const posix = std.os.posix; pub const TcpServer = struct { - handleRequestFn: async(&mem.Allocator) fn (&TcpServer, &const std.net.Address, &const std.os.File) void, + handleRequestFn: async<&mem.Allocator> fn (&TcpServer, &const std.net.Address, &const std.os.File) void, loop: &Loop, sockfd: i32, accept_coro: ?promise, + listen_address: std.net.Address, waiting_for_emfile_node: PromiseNode, @@ -28,18 +29,20 @@ pub const TcpServer = struct { .accept_coro = null, .handleRequestFn = undefined, .waiting_for_emfile_node = undefined, + .listen_address = undefined, }; } pub fn listen(self: &TcpServer, address: &const std.net.Address, - handleRequestFn: async(&mem.Allocator) fn (&TcpServer, &const std.net.Address, &const std.os.File)void) !void + handleRequestFn: async<&mem.Allocator> fn (&TcpServer, &const std.net.Address, &const std.os.File)void) !void { self.handleRequestFn = handleRequestFn; try std.os.posixBind(self.sockfd, &address.sockaddr); try std.os.posixListen(self.sockfd, posix.SOMAXCONN); + self.listen_address = std.net.Address.initPosix(try std.os.posixGetSockName(self.sockfd)); - self.accept_coro = try async(self.loop.allocator) (TcpServer.handler)(self); // TODO #817 + self.accept_coro = try async TcpServer.handler(self); errdefer cancel ??self.accept_coro; try self.loop.addFd(self.sockfd, ??self.accept_coro); @@ -60,10 +63,7 @@ pub const TcpServer = struct { posix.SOCK_NONBLOCK | posix.SOCK_CLOEXEC)) |accepted_fd| { var socket = std.os.File.openHandle(accepted_fd); - // TODO #817 - _ = async(self.loop.allocator) (self.handleRequestFn)(self, accepted_addr, - socket) catch |err| switch (err) - { + _ = async self.handleRequestFn(self, accepted_addr, socket) catch |err| switch (err) { error.OutOfMemory => { socket.close(); continue; @@ -161,7 +161,7 @@ test "listen on a port, send bytes, receive bytes" { const Self = this; - async(&mem.Allocator) fn handler(tcp_server: &TcpServer, _addr: &const std.net.Address, + async<&mem.Allocator> fn handler(tcp_server: &TcpServer, _addr: &const std.net.Address, _socket: &const std.os.File) void { const self = @fieldParentPtr(Self, "tcp_server", tcp_server); @@ -198,5 +198,11 @@ test "listen on a port, send bytes, receive bytes" { defer server.tcp_server.deinit(); try server.tcp_server.listen(addr, MyServer.handler); + var stderr_file = try std.io.getStdErr(); + var stderr_stream = &std.io.FileOutStream.init(&stderr_file).stream; + try stderr_stream.print("\nlistening at "); + try server.tcp_server.listen_address.format(stderr_stream); + try stderr_stream.print("\n"); + loop.run(); } diff --git a/std/net.zig b/std/net.zig index 595baae9dd..3dddffda90 100644 --- a/std/net.zig +++ b/std/net.zig @@ -35,6 +35,12 @@ pub const Address = struct { }; } + pub fn initPosix(addr: &const posix.sockaddr) Address { + return Address { + .sockaddr = *addr, + }; + } + pub fn format(self: &const Address, out_stream: var) !void { switch (self.sockaddr.in.family) { posix.AF_INET => { diff --git a/std/os/index.zig b/std/os/index.zig index 6f9db7edcd..e3ab34b355 100644 --- a/std/os/index.zig +++ b/std/os/index.zig @@ -2229,3 +2229,27 @@ pub fn linuxEpollWait(epfd: i32, events: []linux.epoll_event, timeout: i32) usiz } } } + +pub const PosixGetSockNameError = error { + /// Insufficient resources were available in the system to perform the operation. + SystemResources, + + Unexpected, +}; + +pub fn posixGetSockName(sockfd: i32) PosixGetSockNameError!posix.sockaddr { + var addr: posix.sockaddr = undefined; + var addrlen: posix.socklen_t = @sizeOf(posix.sockaddr); + const rc = posix.getsockname(sockfd, &addr, &addrlen); + const err = posix.getErrno(rc); + switch (err) { + 0 => return addr, + else => return unexpectedErrorPosix(err), + + posix.EBADF => unreachable, + posix.EFAULT => unreachable, + posix.EINVAL => unreachable, + posix.ENOTSOCK => unreachable, + posix.ENOBUFS => return PosixGetSockNameError.SystemResources, + } +} -- cgit v1.2.3 From e85a10e9f5f6b17736babd321da8dceb72ea17af Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Mon, 9 Apr 2018 00:52:45 -0400 Subject: async tcp server proof of concept --- CMakeLists.txt | 1 + src/codegen.cpp | 3 ++ src/ir.cpp | 33 ++++++++---- std/c/darwin.zig | 8 +++ std/event.zig | 45 ++++++++++++---- std/index.zig | 2 +- std/net.zig | 27 +++++++--- std/os/darwin.zig | 3 ++ std/os/index.zig | 133 +++++++++++++++++++++++++++++++++++++++++++++- std/os/linux/index.zig | 12 ++--- test/cases/coroutines.zig | 14 ----- 11 files changed, 231 insertions(+), 50 deletions(-) (limited to 'std/net.zig') diff --git a/CMakeLists.txt b/CMakeLists.txt index c6f169d635..b5171d7266 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -432,6 +432,7 @@ set(ZIG_STD_FILES "dwarf.zig" "elf.zig" "empty.zig" + "event.zig" "fmt/errol/enum3.zig" "fmt/errol/index.zig" "fmt/errol/lookup.zig" diff --git a/src/codegen.cpp b/src/codegen.cpp index be83f68349..2aca143524 100644 --- a/src/codegen.cpp +++ b/src/codegen.cpp @@ -408,6 +408,9 @@ static uint32_t get_err_ret_trace_arg_index(CodeGen *g, FnTableEntry *fn_table_e if (!g->have_err_ret_tracing) { return UINT32_MAX; } + if (fn_table_entry->type_entry->data.fn.fn_type_id.cc == CallingConventionAsync) { + return 0; + } TypeTableEntry *fn_type = fn_table_entry->type_entry; if (!fn_type_can_fail(&fn_type->data.fn.fn_type_id)) { return UINT32_MAX; diff --git a/src/ir.cpp b/src/ir.cpp index 5348e8ba13..0b072cc696 100644 --- a/src/ir.cpp +++ b/src/ir.cpp @@ -2755,9 +2755,10 @@ static IrInstruction *ir_mark_gen(IrInstruction *instruction) { static bool ir_gen_defers_for_block(IrBuilder *irb, Scope *inner_scope, Scope *outer_scope, bool gen_error_defers) { Scope *scope = inner_scope; + bool is_noreturn = false; while (scope != outer_scope) { if (!scope) - return false; + return is_noreturn; if (scope->id == ScopeIdDefer) { AstNode *defer_node = scope->source_node; @@ -2770,14 +2771,18 @@ static bool ir_gen_defers_for_block(IrBuilder *irb, Scope *inner_scope, Scope *o Scope *defer_expr_scope = defer_node->data.defer.expr_scope; IrInstruction *defer_expr_value = ir_gen_node(irb, defer_expr_node, defer_expr_scope); if (defer_expr_value != irb->codegen->invalid_instruction) { - ir_mark_gen(ir_build_check_statement_is_void(irb, defer_expr_scope, defer_expr_node, defer_expr_value)); + if (defer_expr_value->value.type != nullptr && defer_expr_value->value.type->id == TypeTableEntryIdUnreachable) { + is_noreturn = true; + } else { + ir_mark_gen(ir_build_check_statement_is_void(irb, defer_expr_scope, defer_expr_node, defer_expr_value)); + } } } } scope = scope->parent; } - return true; + return is_noreturn; } static void ir_set_cursor_at_end(IrBuilder *irb, IrBasicBlock *basic_block) { @@ -2936,12 +2941,13 @@ static IrInstruction *ir_gen_return(IrBuilder *irb, Scope *scope, AstNode *node, ir_mark_gen(ir_build_cond_br(irb, scope, node, is_err_val, return_block, continue_block, is_comptime)); ir_set_cursor_at_end_and_append_block(irb, return_block); - ir_gen_defers_for_block(irb, scope, outer_scope, true); - IrInstruction *err_val = ir_build_unwrap_err_code(irb, scope, node, err_union_ptr); - if (irb->codegen->have_err_ret_tracing && !should_inline) { - ir_build_save_err_ret_addr(irb, scope, node); + if (!ir_gen_defers_for_block(irb, scope, outer_scope, true)) { + IrInstruction *err_val = ir_build_unwrap_err_code(irb, scope, node, err_union_ptr); + if (irb->codegen->have_err_ret_tracing && !should_inline) { + ir_build_save_err_ret_addr(irb, scope, node); + } + ir_gen_async_return(irb, scope, node, err_val, false); } - ir_gen_async_return(irb, scope, node, err_val, false); ir_set_cursor_at_end_and_append_block(irb, continue_block); IrInstruction *unwrapped_ptr = ir_build_unwrap_err_payload(irb, scope, node, err_union_ptr, false); @@ -5695,7 +5701,7 @@ static IrInstruction *ir_gen_continue(IrBuilder *irb, Scope *continue_scope, Ast IrBasicBlock *dest_block = loop_scope->continue_block; ir_gen_defers_for_block(irb, continue_scope, dest_block->scope, false); - return ir_build_br(irb, continue_scope, node, dest_block, is_comptime); + return ir_mark_gen(ir_build_br(irb, continue_scope, node, dest_block, is_comptime)); } static IrInstruction *ir_gen_error_type(IrBuilder *irb, Scope *scope, AstNode *node) { @@ -6178,7 +6184,7 @@ static IrInstruction *ir_gen_await_expr(IrBuilder *irb, Scope *parent_scope, Ast ir_set_cursor_at_end_and_append_block(irb, cleanup_block); ir_gen_defers_for_block(irb, parent_scope, outer_scope, true); - ir_build_br(irb, parent_scope, node, irb->exec->coro_final_cleanup_block, const_bool_false); + ir_mark_gen(ir_build_br(irb, parent_scope, node, irb->exec->coro_final_cleanup_block, const_bool_false)); ir_set_cursor_at_end_and_append_block(irb, resume_block); IrInstruction *yes_suspend_result = ir_build_load_ptr(irb, parent_scope, node, my_result_var_ptr); @@ -6254,7 +6260,7 @@ static IrInstruction *ir_gen_suspend(IrBuilder *irb, Scope *parent_scope, AstNod ir_set_cursor_at_end_and_append_block(irb, cleanup_block); ir_gen_defers_for_block(irb, parent_scope, outer_scope, true); - ir_build_br(irb, parent_scope, node, irb->exec->coro_final_cleanup_block, const_bool_false); + ir_mark_gen(ir_build_br(irb, parent_scope, node, irb->exec->coro_final_cleanup_block, const_bool_false)); ir_set_cursor_at_end_and_append_block(irb, resume_block); return ir_build_const_void(irb, parent_scope, node); @@ -16746,6 +16752,11 @@ static TypeTableEntry *ir_analyze_instruction_fn_proto(IrAnalyze *ira, IrInstruc return ira->codegen->builtin_types.entry_invalid; if (fn_type_id.cc == CallingConventionAsync) { + if (instruction->async_allocator_type_value == nullptr) { + ir_add_error(ira, &instruction->base, + buf_sprintf("async fn proto missing allocator type")); + return ira->codegen->builtin_types.entry_invalid; + } IrInstruction *async_allocator_type_value = instruction->async_allocator_type_value->other; fn_type_id.async_allocator_type = ir_resolve_type(ira, async_allocator_type_value); if (type_is_invalid(fn_type_id.async_allocator_type)) diff --git a/std/c/darwin.zig b/std/c/darwin.zig index aa49dfa3df..feb689cdc5 100644 --- a/std/c/darwin.zig +++ b/std/c/darwin.zig @@ -55,3 +55,11 @@ pub const dirent = extern struct { d_type: u8, d_name: u8, // field address is address of first byte of name }; + +pub const sockaddr = extern struct { + sa_len: u8, + sa_family: sa_family_t, + sa_data: [14]u8, +}; + +pub const sa_family_t = u8; diff --git a/std/event.zig b/std/event.zig index 2ea5d03865..bdad7fcc18 100644 --- a/std/event.zig +++ b/std/event.zig @@ -1,4 +1,5 @@ const std = @import("index.zig"); +const builtin = @import("builtin"); const assert = std.debug.assert; const event = this; const mem = std.mem; @@ -38,7 +39,7 @@ pub const TcpServer = struct { { self.handleRequestFn = handleRequestFn; - try std.os.posixBind(self.sockfd, &address.sockaddr); + try std.os.posixBind(self.sockfd, &address.os_addr); try std.os.posixListen(self.sockfd, posix.SOMAXCONN); self.listen_address = std.net.Address.initPosix(try std.os.posixGetSockName(self.sockfd)); @@ -59,7 +60,7 @@ pub const TcpServer = struct { pub async fn handler(self: &TcpServer) void { while (true) { var accepted_addr: std.net.Address = undefined; - if (std.os.posixAccept(self.sockfd, &accepted_addr.sockaddr, + if (std.os.posixAccept(self.sockfd, &accepted_addr.os_addr, posix.SOCK_NONBLOCK | posix.SOCK_CLOEXEC)) |accepted_fd| { var socket = std.os.File.openHandle(accepted_fd); @@ -118,7 +119,7 @@ pub const Loop = struct { pub fn addFd(self: &Loop, fd: i32, prom: promise) !void { var ev = std.os.linux.epoll_event { - .events = std.os.linux.EPOLLIN|std.os.linux.EPOLLET, + .events = std.os.linux.EPOLLIN|std.os.linux.EPOLLOUT|std.os.linux.EPOLLET, .data = std.os.linux.epoll_data { .ptr = @ptrToInt(prom), }, @@ -155,7 +156,24 @@ pub const Loop = struct { } }; +pub async fn connect(loop: &Loop, _address: &const std.net.Address) !std.os.File { + var address = *_address; // TODO https://github.com/zig-lang/zig/issues/733 + + const sockfd = try std.os.posixSocket(posix.AF_INET, posix.SOCK_STREAM|posix.SOCK_CLOEXEC|posix.SOCK_NONBLOCK, posix.PROTO_tcp); + errdefer std.os.close(sockfd); + + try std.os.posixConnectAsync(sockfd, &address.os_addr); + try await try async loop.waitFd(sockfd); + try std.os.posixGetSockOptConnectError(sockfd); + + return std.os.File.openHandle(sockfd); +} + test "listen on a port, send bytes, receive bytes" { + if (builtin.os != builtin.Os.linux) { + // TODO build abstractions for other operating systems + return; + } const MyServer = struct { tcp_server: TcpServer, @@ -198,11 +216,20 @@ test "listen on a port, send bytes, receive bytes" { defer server.tcp_server.deinit(); try server.tcp_server.listen(addr, MyServer.handler); - var stderr_file = try std.io.getStdErr(); - var stderr_stream = &std.io.FileOutStream.init(&stderr_file).stream; - try stderr_stream.print("\nlistening at "); - try server.tcp_server.listen_address.format(stderr_stream); - try stderr_stream.print("\n"); - + const p = try async doAsyncTest(&loop, server.tcp_server.listen_address); + defer cancel p; loop.run(); } + +async fn doAsyncTest(loop: &Loop, address: &const std.net.Address) void { + errdefer @panic("test failure"); + + var socket_file = try await try async event.connect(loop, address); + defer socket_file.close(); + + var buf: [512]u8 = undefined; + const amt_read = try socket_file.read(buf[0..]); + const msg = buf[0..amt_read]; + assert(mem.eql(u8, msg, "hello from server\n")); + loop.stop(); +} diff --git a/std/index.zig b/std/index.zig index 7084c55189..07c4360aab 100644 --- a/std/index.zig +++ b/std/index.zig @@ -50,7 +50,7 @@ test "std" { _ = @import("dwarf.zig"); _ = @import("elf.zig"); _ = @import("empty.zig"); - //TODO_ = @import("event.zig"); + _ = @import("event.zig"); _ = @import("fmt/index.zig"); _ = @import("hash/index.zig"); _ = @import("io.zig"); diff --git a/std/net.zig b/std/net.zig index 3dddffda90..8e1b8d97b2 100644 --- a/std/net.zig +++ b/std/net.zig @@ -1,15 +1,26 @@ const std = @import("index.zig"); +const builtin = @import("builtin"); const assert = std.debug.assert; const net = this; const posix = std.os.posix; const mem = std.mem; +pub const TmpWinAddr = struct { + family: u8, + data: [14]u8, +}; + +pub const OsAddress = switch (builtin.os) { + builtin.Os.windows => TmpWinAddr, + else => posix.sockaddr, +}; + pub const Address = struct { - sockaddr: posix.sockaddr, + os_addr: OsAddress, pub fn initIp4(ip4: u32, port: u16) Address { return Address { - .sockaddr = posix.sockaddr { + .os_addr = posix.sockaddr { .in = posix.sockaddr_in { .family = posix.AF_INET, .port = std.mem.endianSwapIfLe(u16, port), @@ -23,7 +34,7 @@ pub const Address = struct { pub fn initIp6(ip6: &const Ip6Addr, port: u16) Address { return Address { .family = posix.AF_INET6, - .sockaddr = posix.sockaddr { + .os_addr = posix.sockaddr { .in6 = posix.sockaddr_in6 { .family = posix.AF_INET6, .port = std.mem.endianSwapIfLe(u16, port), @@ -37,19 +48,19 @@ pub const Address = struct { pub fn initPosix(addr: &const posix.sockaddr) Address { return Address { - .sockaddr = *addr, + .os_addr = *addr, }; } pub fn format(self: &const Address, out_stream: var) !void { - switch (self.sockaddr.in.family) { + switch (self.os_addr.in.family) { posix.AF_INET => { - const native_endian_port = std.mem.endianSwapIfLe(u16, self.sockaddr.in.port); - const bytes = ([]const u8)((&self.sockaddr.in.addr)[0..1]); + const native_endian_port = std.mem.endianSwapIfLe(u16, self.os_addr.in.port); + const bytes = ([]const u8)((&self.os_addr.in.addr)[0..1]); try out_stream.print("{}.{}.{}.{}:{}", bytes[0], bytes[1], bytes[2], bytes[3], native_endian_port); }, posix.AF_INET6 => { - const native_endian_port = std.mem.endianSwapIfLe(u16, self.sockaddr.in6.port); + const native_endian_port = std.mem.endianSwapIfLe(u16, self.os_addr.in6.port); try out_stream.print("[TODO render ip6 address]:{}", native_endian_port); }, else => try out_stream.write("(unrecognized address family)"), diff --git a/std/os/darwin.zig b/std/os/darwin.zig index f8b1fbed3b..40da55315c 100644 --- a/std/os/darwin.zig +++ b/std/os/darwin.zig @@ -301,6 +301,9 @@ pub const timespec = c.timespec; pub const Stat = c.Stat; pub const dirent = c.dirent; +pub const sa_family_t = c.sa_family_t; +pub const sockaddr = c.sockaddr; + /// Renamed from `sigaction` to `Sigaction` to avoid conflict with the syscall. pub const Sigaction = struct { handler: extern fn(i32)void, diff --git a/std/os/index.zig b/std/os/index.zig index e3ab34b355..b6caed6f53 100644 --- a/std/os/index.zig +++ b/std/os/index.zig @@ -2217,7 +2217,7 @@ pub fn linuxEpollCtl(epfd: i32, op: u32, fd: i32, event: &linux.epoll_event) Lin pub fn linuxEpollWait(epfd: i32, events: []linux.epoll_event, timeout: i32) usize { while (true) { - const rc = posix.epoll_wait(epfd, &events[0], u32(events.len), timeout); + const rc = posix.epoll_wait(epfd, events.ptr, u32(events.len), timeout); const err = posix.getErrno(rc); switch (err) { 0 => return rc, @@ -2253,3 +2253,134 @@ pub fn posixGetSockName(sockfd: i32) PosixGetSockNameError!posix.sockaddr { posix.ENOBUFS => return PosixGetSockNameError.SystemResources, } } + +pub const PosixConnectError = error { + /// For UNIX domain sockets, which are identified by pathname: Write permission is denied on the socket + /// file, or search permission is denied for one of the directories in the path prefix. + /// or + /// The user tried to connect to a broadcast address without having the socket broadcast flag enabled or + /// the connection request failed because of a local firewall rule. + PermissionDenied, + + /// Local address is already in use. + AddressInUse, + + /// (Internet domain sockets) The socket referred to by sockfd had not previously been bound to an + /// address and, upon attempting to bind it to an ephemeral port, it was determined that all port numbers + /// in the ephemeral port range are currently in use. See the discussion of + /// /proc/sys/net/ipv4/ip_local_port_range in ip(7). + AddressNotAvailable, + + /// The passed address didn't have the correct address family in its sa_family field. + AddressFamilyNotSupported, + + /// Insufficient entries in the routing cache. + SystemResources, + + /// A connect() on a stream socket found no one listening on the remote address. + ConnectionRefused, + + /// Network is unreachable. + NetworkUnreachable, + + /// Timeout while attempting connection. The server may be too busy to accept new connections. Note + /// that for IP sockets the timeout may be very long when syncookies are enabled on the server. + ConnectionTimedOut, + + Unexpected, +}; + +pub fn posixConnect(sockfd: i32, sockaddr: &const posix.sockaddr) PosixConnectError!void { + while (true) { + const rc = posix.connect(sockfd, sockaddr, @sizeOf(posix.sockaddr)); + const err = posix.getErrno(rc); + switch (err) { + 0 => return, + else => return unexpectedErrorPosix(err), + + posix.EACCES => return PosixConnectError.PermissionDenied, + posix.EPERM => return PosixConnectError.PermissionDenied, + posix.EADDRINUSE => return PosixConnectError.AddressInUse, + posix.EADDRNOTAVAIL => return PosixConnectError.AddressNotAvailable, + posix.EAFNOSUPPORT => return PosixConnectError.AddressFamilyNotSupported, + posix.EAGAIN => return PosixConnectError.SystemResources, + posix.EALREADY => unreachable, // The socket is nonblocking and a previous connection attempt has not yet been completed. + posix.EBADF => unreachable, // sockfd is not a valid open file descriptor. + posix.ECONNREFUSED => return PosixConnectError.ConnectionRefused, + posix.EFAULT => unreachable, // The socket structure address is outside the user's address space. + posix.EINPROGRESS => unreachable, // The socket is nonblocking and the connection cannot be completed immediately. + posix.EINTR => continue, + posix.EISCONN => unreachable, // The socket is already connected. + posix.ENETUNREACH => return PosixConnectError.NetworkUnreachable, + posix.ENOTSOCK => unreachable, // The file descriptor sockfd does not refer to a socket. + posix.EPROTOTYPE => unreachable, // The socket type does not support the requested communications protocol. + posix.ETIMEDOUT => return PosixConnectError.ConnectionTimedOut, + } + } +} + +/// Same as posixConnect except it is for blocking socket file descriptors. +/// It expects to receive EINPROGRESS. +pub fn posixConnectAsync(sockfd: i32, sockaddr: &const posix.sockaddr) PosixConnectError!void { + while (true) { + const rc = posix.connect(sockfd, sockaddr, @sizeOf(posix.sockaddr)); + const err = posix.getErrno(rc); + switch (err) { + 0, posix.EINPROGRESS => return, + else => return unexpectedErrorPosix(err), + + posix.EACCES => return PosixConnectError.PermissionDenied, + posix.EPERM => return PosixConnectError.PermissionDenied, + posix.EADDRINUSE => return PosixConnectError.AddressInUse, + posix.EADDRNOTAVAIL => return PosixConnectError.AddressNotAvailable, + posix.EAFNOSUPPORT => return PosixConnectError.AddressFamilyNotSupported, + posix.EAGAIN => return PosixConnectError.SystemResources, + posix.EALREADY => unreachable, // The socket is nonblocking and a previous connection attempt has not yet been completed. + posix.EBADF => unreachable, // sockfd is not a valid open file descriptor. + posix.ECONNREFUSED => return PosixConnectError.ConnectionRefused, + posix.EFAULT => unreachable, // The socket structure address is outside the user's address space. + posix.EINTR => continue, + posix.EISCONN => unreachable, // The socket is already connected. + posix.ENETUNREACH => return PosixConnectError.NetworkUnreachable, + posix.ENOTSOCK => unreachable, // The file descriptor sockfd does not refer to a socket. + posix.EPROTOTYPE => unreachable, // The socket type does not support the requested communications protocol. + posix.ETIMEDOUT => return PosixConnectError.ConnectionTimedOut, + } + } +} + +pub fn posixGetSockOptConnectError(sockfd: i32) PosixConnectError!void { + var err_code: i32 = undefined; + var size: u32 = @sizeOf(i32); + const rc = posix.getsockopt(sockfd, posix.SOL_SOCKET, posix.SO_ERROR, @ptrCast(&u8, &err_code), &size); + assert(size == 4); + const err = posix.getErrno(rc); + switch (err) { + 0 => switch (err_code) { + 0 => return, + else => return unexpectedErrorPosix(err), + + posix.EACCES => return PosixConnectError.PermissionDenied, + posix.EPERM => return PosixConnectError.PermissionDenied, + posix.EADDRINUSE => return PosixConnectError.AddressInUse, + posix.EADDRNOTAVAIL => return PosixConnectError.AddressNotAvailable, + posix.EAFNOSUPPORT => return PosixConnectError.AddressFamilyNotSupported, + posix.EAGAIN => return PosixConnectError.SystemResources, + posix.EALREADY => unreachable, // The socket is nonblocking and a previous connection attempt has not yet been completed. + posix.EBADF => unreachable, // sockfd is not a valid open file descriptor. + posix.ECONNREFUSED => return PosixConnectError.ConnectionRefused, + posix.EFAULT => unreachable, // The socket structure address is outside the user's address space. + posix.EISCONN => unreachable, // The socket is already connected. + posix.ENETUNREACH => return PosixConnectError.NetworkUnreachable, + posix.ENOTSOCK => unreachable, // The file descriptor sockfd does not refer to a socket. + posix.EPROTOTYPE => unreachable, // The socket type does not support the requested communications protocol. + posix.ETIMEDOUT => return PosixConnectError.ConnectionTimedOut, + }, + else => return unexpectedErrorPosix(err), + posix.EBADF => unreachable, // The argument sockfd is not a valid file descriptor. + posix.EFAULT => unreachable, // The address pointed to by optval or optlen is not in a valid part of the process address space. + posix.EINVAL => unreachable, + posix.ENOPROTOOPT => unreachable, // The option is unknown at the level indicated. + posix.ENOTSOCK => unreachable, // The file descriptor sockfd does not refer to a socket. + } +} diff --git a/std/os/linux/index.zig b/std/os/linux/index.zig index 602ff66e74..7f27fc83d9 100644 --- a/std/os/linux/index.zig +++ b/std/os/linux/index.zig @@ -775,12 +775,12 @@ pub fn socket(domain: u32, socket_type: u32, protocol: u32) usize { return syscall3(SYS_socket, domain, socket_type, protocol); } -pub fn setsockopt(fd: i32, level: i32, optname: i32, optval: &const u8, optlen: socklen_t) usize { - return syscall5(SYS_setsockopt, usize(fd), usize(level), usize(optname), usize(optval), @ptrToInt(optlen)); +pub fn setsockopt(fd: i32, level: u32, optname: u32, optval: &const u8, optlen: socklen_t) usize { + return syscall5(SYS_setsockopt, usize(fd), level, optname, usize(optval), @ptrToInt(optlen)); } -pub fn getsockopt(fd: i32, level: i32, optname: i32, noalias optval: &u8, noalias optlen: &socklen_t) usize { - return syscall5(SYS_getsockopt, usize(fd), usize(level), usize(optname), @ptrToInt(optval), @ptrToInt(optlen)); +pub fn getsockopt(fd: i32, level: u32, optname: u32, noalias optval: &u8, noalias optlen: &socklen_t) usize { + return syscall5(SYS_getsockopt, usize(fd), level, optname, @ptrToInt(optval), @ptrToInt(optlen)); } pub fn sendmsg(fd: i32, msg: &const msghdr, flags: u32) usize { @@ -833,14 +833,14 @@ pub fn fstat(fd: i32, stat_buf: &Stat) usize { return syscall2(SYS_fstat, usize(fd), @ptrToInt(stat_buf)); } -pub const epoll_data = extern union { +pub const epoll_data = packed union { ptr: usize, fd: i32, @"u32": u32, @"u64": u64, }; -pub const epoll_event = extern struct { +pub const epoll_event = packed struct { events: u32, data: epoll_data, }; diff --git a/test/cases/coroutines.zig b/test/cases/coroutines.zig index fbd8f08607..6d28b98c9d 100644 --- a/test/cases/coroutines.zig +++ b/test/cases/coroutines.zig @@ -224,17 +224,3 @@ async fn printTrace(p: promise->error!void) void { } }; } - -test "coroutine in a struct field" { - const Foo = struct { - bar: async fn() void, - }; - var foo = Foo { - .bar = simpleAsyncFn2, - }; - cancel try async foo.bar(); -} - -async fn simpleAsyncFn2() void { - suspend; -} -- cgit v1.2.3