diff options
| -rw-r--r-- | lib/std/Io/Threaded.zig | 153 |
1 files changed, 76 insertions, 77 deletions
diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig index f61a75f9ba..1957c7f210 100644 --- a/lib/std/Io/Threaded.zig +++ b/lib/std/Io/Threaded.zig @@ -1431,8 +1431,8 @@ fn netReceive( // the split vectors though because reducing the buffer size might make // some messages unreceivable. - // So the strategy instead is to use poll with timeout and then non-blocking - // recvmsg calls. + // So the strategy instead is to use non-blocking recvmsg calls, calling + // poll() with timeout if the first one returns EAGAIN. const posix_flags: u32 = @as(u32, if (flags.oob) posix.MSG.OOB else 0) | @as(u32, if (flags.peek) posix.MSG.PEEK else 0) | @@ -1449,93 +1449,92 @@ fn netReceive( var message_i: usize = 0; var data_i: usize = 0; - // TODO: recvmsg first, then poll if EAGAIN. saves syscall in case the messages are already queued. - const deadline = timeout.toDeadline(pool.io()) catch |err| return .{ err, message_i }; - poll: while (true) { + recv: while (true) { pool.checkCancel() catch |err| return .{ err, message_i }; - if (message_i > 0 or message_buffer.len - message_i == 0) return .{ null, message_i }; - - const max_poll_ms = std.math.maxInt(u31); - const timeout_ms: u31 = if (deadline) |d| t: { - const duration = d.durationFromNow(pool.io()) catch |err| return .{ err, message_i }; - if (duration.nanoseconds <= 0) return .{ error.Timeout, message_i }; - break :t @intCast(@min(max_poll_ms, duration.toMilliseconds())); - } else max_poll_ms; + if (message_buffer.len - message_i == 0) return .{ null, message_i }; + const message = &message_buffer[message_i]; + const remaining_data_buffer = data_buffer[data_i..]; + var storage: PosixAddress = undefined; + var iov: posix.iovec = .{ .base = remaining_data_buffer.ptr, .len = remaining_data_buffer.len }; + var msg: posix.msghdr = .{ + .name = &storage.any, + .namelen = @sizeOf(PosixAddress), + .iov = (&iov)[0..1], + .iovlen = 1, + .control = message.control.ptr, + .controllen = message.control.len, + .flags = undefined, + }; - const poll_rc = posix.system.poll(&poll_fds, poll_fds.len, timeout_ms); - switch (posix.errno(poll_rc)) { + const recv_rc = posix.system.recvmsg(handle, &msg, posix_flags); + switch (posix.errno(recv_rc)) { .SUCCESS => { - if (poll_rc == 0) { - // Possibly spurious timeout. - if (deadline == null) continue; - return .{ error.Timeout, message_i }; - } - - // Proceed to recvmsg. - while (true) { - pool.checkCancel() catch |err| return .{ err, message_i }; - - const message = &message_buffer[message_i]; - const remaining_data_buffer = data_buffer[data_i..]; - var storage: PosixAddress = undefined; - var iov: posix.iovec = .{ .base = remaining_data_buffer.ptr, .len = remaining_data_buffer.len }; - var msg: posix.msghdr = .{ - .name = &storage.any, - .namelen = @sizeOf(PosixAddress), - .iov = (&iov)[0..1], - .iovlen = 1, - .control = message.control.ptr, - .controllen = message.control.len, - .flags = undefined, - }; - - const rc = posix.system.recvmsg(handle, &msg, posix_flags); - switch (posix.errno(rc)) { - .SUCCESS => { - const data = remaining_data_buffer[0..@intCast(rc)]; - data_i += data.len; - message.* = .{ - .from = addressFromPosix(&storage), - .data = data, - .control = if (msg.control) |ptr| @as([*]u8, @ptrCast(ptr))[0..msg.controllen] else message.control, - .flags = .{ - .eor = (msg.flags & posix.MSG.EOR) != 0, - .trunc = (msg.flags & posix.MSG.TRUNC) != 0, - .ctrunc = (msg.flags & posix.MSG.CTRUNC) != 0, - .oob = (msg.flags & posix.MSG.OOB) != 0, - .errqueue = (msg.flags & posix.MSG.ERRQUEUE) != 0, - }, - }; - message_i += 1; - continue; - }, - .AGAIN => continue :poll, - .BADF => |err| return .{ errnoBug(err), message_i }, - .NFILE => return .{ error.SystemFdQuotaExceeded, message_i }, - .MFILE => return .{ error.ProcessFdQuotaExceeded, message_i }, - .INTR => continue, - .FAULT => |err| return .{ errnoBug(err), message_i }, - .INVAL => |err| return .{ errnoBug(err), message_i }, - .NOBUFS => return .{ error.SystemResources, message_i }, - .NOMEM => return .{ error.SystemResources, message_i }, - .NOTCONN => return .{ error.SocketUnconnected, message_i }, - .NOTSOCK => |err| return .{ errnoBug(err), message_i }, - .MSGSIZE => return .{ error.MessageOversize, message_i }, - .PIPE => return .{ error.SocketUnconnected, message_i }, - .OPNOTSUPP => |err| return .{ errnoBug(err), message_i }, - .CONNRESET => return .{ error.ConnectionResetByPeer, message_i }, - .NETDOWN => return .{ error.NetworkDown, message_i }, - else => |err| return .{ posix.unexpectedErrno(err), message_i }, - } + const data = remaining_data_buffer[0..@intCast(recv_rc)]; + data_i += data.len; + message.* = .{ + .from = addressFromPosix(&storage), + .data = data, + .control = if (msg.control) |ptr| @as([*]u8, @ptrCast(ptr))[0..msg.controllen] else message.control, + .flags = .{ + .eor = (msg.flags & posix.MSG.EOR) != 0, + .trunc = (msg.flags & posix.MSG.TRUNC) != 0, + .ctrunc = (msg.flags & posix.MSG.CTRUNC) != 0, + .oob = (msg.flags & posix.MSG.OOB) != 0, + .errqueue = (msg.flags & posix.MSG.ERRQUEUE) != 0, + }, + }; + message_i += 1; + continue; + }, + .AGAIN => while (true) { + pool.checkCancel() catch |err| return .{ err, message_i }; + if (message_i != 0) return .{ null, message_i }; + + const max_poll_ms = std.math.maxInt(u31); + const timeout_ms: u31 = if (deadline) |d| t: { + const duration = d.durationFromNow(pool.io()) catch |err| return .{ err, message_i }; + if (duration.nanoseconds <= 0) return .{ error.Timeout, message_i }; + break :t @intCast(@min(max_poll_ms, duration.toMilliseconds())); + } else max_poll_ms; + + const poll_rc = posix.system.poll(&poll_fds, poll_fds.len, timeout_ms); + switch (posix.errno(poll_rc)) { + .SUCCESS => { + if (poll_rc == 0) { + // Although spurious timeouts are OK, when no deadline + // is passed we must not return `error.Timeout`. + if (deadline == null) continue; + return .{ error.Timeout, message_i }; + } + continue :recv; + }, + .INTR => continue, + + .FAULT => |err| return .{ errnoBug(err), message_i }, + .INVAL => |err| return .{ errnoBug(err), message_i }, + .NOMEM => return .{ error.SystemResources, message_i }, + else => |err| return .{ posix.unexpectedErrno(err), message_i }, } }, .INTR => continue, + + .BADF => |err| return .{ errnoBug(err), message_i }, + .NFILE => return .{ error.SystemFdQuotaExceeded, message_i }, + .MFILE => return .{ error.ProcessFdQuotaExceeded, message_i }, .FAULT => |err| return .{ errnoBug(err), message_i }, .INVAL => |err| return .{ errnoBug(err), message_i }, + .NOBUFS => return .{ error.SystemResources, message_i }, .NOMEM => return .{ error.SystemResources, message_i }, + .NOTCONN => return .{ error.SocketUnconnected, message_i }, + .NOTSOCK => |err| return .{ errnoBug(err), message_i }, + .MSGSIZE => return .{ error.MessageOversize, message_i }, + .PIPE => return .{ error.SocketUnconnected, message_i }, + .OPNOTSUPP => |err| return .{ errnoBug(err), message_i }, + .CONNRESET => return .{ error.ConnectionResetByPeer, message_i }, + .NETDOWN => return .{ error.NetworkDown, message_i }, else => |err| return .{ posix.unexpectedErrno(err), message_i }, } } |
