diff options
Diffstat (limited to 'lib/std')
| -rw-r--r-- | lib/std/Io.zig | 303 | ||||
| -rw-r--r-- | lib/std/Io/Threaded.zig | 33 | ||||
| -rw-r--r-- | lib/std/Io/net/HostName.zig | 107 | ||||
| -rw-r--r-- | lib/std/Io/net/test.zig | 30 | ||||
| -rw-r--r-- | lib/std/Io/test.zig | 61 |
5 files changed, 370 insertions, 164 deletions
diff --git a/lib/std/Io.zig b/lib/std/Io.zig index 7c5c0ed70a..367871787c 100644 --- a/lib/std/Io.zig +++ b/lib/std/Io.zig @@ -701,7 +701,7 @@ pub const VTable = struct { netClose: *const fn (?*anyopaque, handle: net.Socket.Handle) void, netInterfaceNameResolve: *const fn (?*anyopaque, *const net.Interface.Name) net.Interface.Name.ResolveError!net.Interface, netInterfaceName: *const fn (?*anyopaque, net.Interface) net.Interface.NameError!net.Interface.Name, - netLookup: *const fn (?*anyopaque, net.HostName, *Queue(net.HostName.LookupResult), net.HostName.LookupOptions) void, + netLookup: *const fn (?*anyopaque, net.HostName, *Queue(net.HostName.LookupResult), net.HostName.LookupOptions) net.HostName.LookupError!void, }; pub const Cancelable = error{ @@ -1208,7 +1208,9 @@ pub fn Select(comptime U: type) type { const args_casted: *const Args = @ptrCast(@alignCast(context)); const unerased_select: *S = @fieldParentPtr("group", group); const elem = @unionInit(U, @tagName(field), @call(.auto, function, args_casted.*)); - unerased_select.queue.putOneUncancelable(unerased_select.io, elem); + unerased_select.queue.putOneUncancelable(unerased_select.io, elem) catch |err| switch (err) { + error.Closed => unreachable, + }; } }; _ = @atomicRmw(usize, &s.outstanding, .Add, 1, .monotonic); @@ -1222,7 +1224,10 @@ pub fn Select(comptime U: type) type { /// Not threadsafe. pub fn wait(s: *S) Cancelable!U { s.outstanding -= 1; - return s.queue.getOne(s.io); + return s.queue.getOne(s.io) catch |err| switch (err) { + error.Canceled => |e| return e, + error.Closed => unreachable, + }; } /// Equivalent to `wait` but requests cancellation on all remaining @@ -1569,8 +1574,11 @@ pub const Event = enum(u32) { } }; +pub const QueueClosedError = error{Closed}; + pub const TypeErasedQueue = struct { mutex: Mutex, + closed: bool, /// Ring buffer. This data is logically *after* queued getters. buffer: []u8, @@ -1582,12 +1590,14 @@ pub const TypeErasedQueue = struct { const Put = struct { remaining: []const u8, + needed: usize, condition: Condition, node: std.DoublyLinkedList.Node, }; const Get = struct { remaining: []u8, + needed: usize, condition: Condition, node: std.DoublyLinkedList.Node, }; @@ -1595,6 +1605,7 @@ pub const TypeErasedQueue = struct { pub fn init(buffer: []u8) TypeErasedQueue { return .{ .mutex = .init, + .closed = false, .buffer = buffer, .start = 0, .len = 0, @@ -1603,7 +1614,27 @@ pub const TypeErasedQueue = struct { }; } - pub fn put(q: *TypeErasedQueue, io: Io, elements: []const u8, min: usize) Cancelable!usize { + pub fn close(q: *TypeErasedQueue, io: Io) void { + q.mutex.lockUncancelable(io); + defer q.mutex.unlock(io); + q.closed = true; + { + var it = q.getters.first; + while (it) |node| : (it = node.next) { + const getter: *Get = @alignCast(@fieldParentPtr("node", node)); + getter.condition.signal(io); + } + } + { + var it = q.putters.first; + while (it) |node| : (it = node.next) { + const putter: *Put = @alignCast(@fieldParentPtr("node", node)); + putter.condition.signal(io); + } + } + } + + pub fn put(q: *TypeErasedQueue, io: Io, elements: []const u8, min: usize) (QueueClosedError || Cancelable)!usize { assert(elements.len >= min); if (elements.len == 0) return 0; try q.mutex.lock(io); @@ -1614,13 +1645,14 @@ pub const TypeErasedQueue = struct { /// Same as `put`, except does not introduce a cancelation point. /// /// For a description of cancelation and cancelation points, see `Future.cancel`. - pub fn putUncancelable(q: *TypeErasedQueue, io: Io, elements: []const u8, min: usize) usize { + pub fn putUncancelable(q: *TypeErasedQueue, io: Io, elements: []const u8, min: usize) QueueClosedError!usize { assert(elements.len >= min); if (elements.len == 0) return 0; q.mutex.lockUncancelable(io); defer q.mutex.unlock(io); return q.putLocked(io, elements, min, true) catch |err| switch (err) { error.Canceled => unreachable, + error.Closed => |e| return e, }; } @@ -1634,49 +1666,79 @@ pub const TypeErasedQueue = struct { return if (slice.len > 0) slice else null; } - fn putLocked(q: *TypeErasedQueue, io: Io, elements: []const u8, min: usize, uncancelable: bool) Cancelable!usize { + fn putLocked(q: *TypeErasedQueue, io: Io, elements: []const u8, target: usize, uncancelable: bool) (QueueClosedError || Cancelable)!usize { + // A closed queue cannot be added to, even if there is space in the buffer. + if (q.closed) return error.Closed; + // Getters have first priority on the data, and only when the getters // queue is empty do we start populating the buffer. - var remaining = elements; + // The number of elements we add immediately, before possibly blocking. + var n: usize = 0; + while (q.getters.popFirst()) |getter_node| { const getter: *Get = @alignCast(@fieldParentPtr("node", getter_node)); - const copy_len = @min(getter.remaining.len, remaining.len); + const copy_len = @min(getter.remaining.len, elements.len - n); assert(copy_len > 0); - @memcpy(getter.remaining[0..copy_len], remaining[0..copy_len]); - remaining = remaining[copy_len..]; + @memcpy(getter.remaining[0..copy_len], elements[n..][0..copy_len]); getter.remaining = getter.remaining[copy_len..]; - if (getter.remaining.len == 0) { + getter.needed -|= copy_len; + n += copy_len; + if (getter.needed == 0) { getter.condition.signal(io); - if (remaining.len > 0) continue; - } else q.getters.prepend(getter_node); - assert(remaining.len == 0); - return elements.len; + } else { + assert(n == elements.len); // we didn't have enough elements for the getter + q.getters.prepend(getter_node); + } + if (n == elements.len) return elements.len; } while (q.puttableSlice()) |slice| { - const copy_len = @min(slice.len, remaining.len); + const copy_len = @min(slice.len, elements.len - n); assert(copy_len > 0); - @memcpy(slice[0..copy_len], remaining[0..copy_len]); + @memcpy(slice[0..copy_len], elements[n..][0..copy_len]); q.len += copy_len; - remaining = remaining[copy_len..]; - if (remaining.len == 0) return elements.len; + n += copy_len; + if (n == elements.len) return elements.len; } - const total_filled = elements.len - remaining.len; - if (total_filled >= min) return total_filled; + // Don't block if we hit the target. + if (n >= target) return n; - var pending: Put = .{ .remaining = remaining, .condition = .{}, .node = .{} }; + var pending: Put = .{ + .remaining = elements[n..], + .needed = target - n, + .condition = .init, + .node = .{}, + }; q.putters.append(&pending.node); - defer if (pending.remaining.len > 0) q.putters.remove(&pending.node); - while (pending.remaining.len > 0) if (uncancelable) - pending.condition.waitUncancelable(io, &q.mutex) - else - try pending.condition.wait(io, &q.mutex); - return elements.len; + defer if (pending.needed > 0) q.putters.remove(&pending.node); + + while (pending.needed > 0 and !q.closed) { + if (uncancelable) { + pending.condition.waitUncancelable(io, &q.mutex); + continue; + } + pending.condition.wait(io, &q.mutex) catch |err| switch (err) { + error.Canceled => if (pending.remaining.len == elements.len) { + // Canceled while waiting, and appended no elements. + return error.Canceled; + } else { + // Canceled while waiting, but appended some elements, so report those first. + io.recancel(); + return elements.len - pending.remaining.len; + }, + }; + } + if (pending.remaining.len == elements.len) { + // The queue was closed while we were waiting. We appended no elements. + assert(q.closed); + return error.Closed; + } + return elements.len - pending.remaining.len; } - pub fn get(q: *@This(), io: Io, buffer: []u8, min: usize) Cancelable!usize { + pub fn get(q: *TypeErasedQueue, io: Io, buffer: []u8, min: usize) (QueueClosedError || Cancelable)!usize { assert(buffer.len >= min); if (buffer.len == 0) return 0; try q.mutex.lock(io); @@ -1687,13 +1749,14 @@ pub const TypeErasedQueue = struct { /// Same as `get`, except does not introduce a cancelation point. /// /// For a description of cancelation and cancelation points, see `Future.cancel`. - pub fn getUncancelable(q: *TypeErasedQueue, io: Io, buffer: []u8, min: usize) usize { + pub fn getUncancelable(q: *TypeErasedQueue, io: Io, buffer: []u8, min: usize) QueueClosedError!usize { assert(buffer.len >= min); if (buffer.len == 0) return 0; q.mutex.lockUncancelable(io); defer q.mutex.unlock(io); return q.getLocked(io, buffer, min, true) catch |err| switch (err) { error.Canceled => unreachable, + error.Closed => |e| return e, }; } @@ -1703,21 +1766,23 @@ pub const TypeErasedQueue = struct { return if (slice.len > 0) slice else null; } - fn getLocked(q: *@This(), io: Io, buffer: []u8, min: usize, uncancelable: bool) Cancelable!usize { + fn getLocked(q: *TypeErasedQueue, io: Io, buffer: []u8, target: usize, uncancelable: bool) (QueueClosedError || Cancelable)!usize { // The ring buffer gets first priority, then data should come from any // queued putters, then finally the ring buffer should be filled with // data from putters so they can be resumed. - var remaining = buffer; + // The number of elements we received immediately, before possibly blocking. + var n: usize = 0; + while (q.gettableSlice()) |slice| { - const copy_len = @min(slice.len, remaining.len); + const copy_len = @min(slice.len, buffer.len - n); assert(copy_len > 0); - @memcpy(remaining[0..copy_len], slice[0..copy_len]); + @memcpy(buffer[n..][0..copy_len], slice[0..copy_len]); q.start += copy_len; if (q.buffer.len - q.start == 0) q.start = 0; q.len -= copy_len; - remaining = remaining[copy_len..]; - if (remaining.len == 0) { + n += copy_len; + if (n == buffer.len) { q.fillRingBufferFromPutters(io); return buffer.len; } @@ -1726,33 +1791,64 @@ pub const TypeErasedQueue = struct { // Copy directly from putters into buffer. while (q.putters.popFirst()) |putter_node| { const putter: *Put = @alignCast(@fieldParentPtr("node", putter_node)); - const copy_len = @min(putter.remaining.len, remaining.len); + const copy_len = @min(putter.remaining.len, buffer.len - n); assert(copy_len > 0); - @memcpy(remaining[0..copy_len], putter.remaining[0..copy_len]); + @memcpy(buffer[n..][0..copy_len], putter.remaining[0..copy_len]); putter.remaining = putter.remaining[copy_len..]; - remaining = remaining[copy_len..]; - if (putter.remaining.len == 0) { + putter.needed -|= copy_len; + n += copy_len; + if (putter.needed == 0) { putter.condition.signal(io); - if (remaining.len > 0) continue; - } else q.putters.prepend(putter_node); - assert(remaining.len == 0); - q.fillRingBufferFromPutters(io); - return buffer.len; + } else { + assert(n == buffer.len); // we didn't have enough space for the putter + q.putters.prepend(putter_node); + } + if (n == buffer.len) { + q.fillRingBufferFromPutters(io); + return buffer.len; + } } - // Both ring buffer and putters queue is empty. - const total_filled = buffer.len - remaining.len; - if (total_filled >= min) return total_filled; + // No need to call `fillRingBufferFromPutters` from this point onwards, + // because we emptied the ring buffer *and* the putter queue! - var pending: Get = .{ .remaining = remaining, .condition = .{}, .node = .{} }; + // Don't block if we hit the target or if the queue is closed. Return how + // many elements we could get immediately, unless the queue was closed and + // empty, in which case report `error.Closed`. + if (n == 0 and q.closed) return error.Closed; + if (n >= target or q.closed) return n; + + var pending: Get = .{ + .remaining = buffer[n..], + .needed = target - n, + .condition = .init, + .node = .{}, + }; q.getters.append(&pending.node); - defer if (pending.remaining.len > 0) q.getters.remove(&pending.node); - while (pending.remaining.len > 0) if (uncancelable) - pending.condition.waitUncancelable(io, &q.mutex) - else - try pending.condition.wait(io, &q.mutex); - q.fillRingBufferFromPutters(io); - return buffer.len; + defer if (pending.needed > 0) q.getters.remove(&pending.node); + + while (pending.needed > 0 and !q.closed) { + if (uncancelable) { + pending.condition.waitUncancelable(io, &q.mutex); + continue; + } + pending.condition.wait(io, &q.mutex) catch |err| switch (err) { + error.Canceled => if (pending.remaining.len == buffer.len) { + // Canceled while waiting, and received no elements. + return error.Canceled; + } else { + // Canceled while waiting, but received some elements, so report those first. + io.recancel(); + return buffer.len - pending.remaining.len; + }, + }; + } + if (pending.remaining.len == buffer.len) { + // The queue was closed while we were waiting. We received no elements. + assert(q.closed); + return error.Closed; + } + return buffer.len - pending.remaining.len; } /// Called when there is nonzero space available in the ring buffer and @@ -1768,7 +1864,8 @@ pub const TypeErasedQueue = struct { @memcpy(slice[0..copy_len], putter.remaining[0..copy_len]); q.len += copy_len; putter.remaining = putter.remaining[copy_len..]; - if (putter.remaining.len == 0) { + putter.needed -|= copy_len; + if (putter.needed == 0) { putter.condition.signal(io); break; } @@ -1791,59 +1888,101 @@ pub fn Queue(Elem: type) type { return .{ .type_erased = .init(@ptrCast(buffer)) }; } - /// Appends elements to the end of the queue. The function returns when - /// at least `min` elements have been added to the buffer or sent - /// directly to a consumer. + pub fn close(q: *@This(), io: Io) void { + q.type_erased.close(io); + } + + /// Appends elements to the end of the queue, potentially blocking if + /// there is insufficient capacity. Returns when any one of the + /// following conditions is satisfied: + /// + /// * At least `target` elements have been added to the queue + /// * The queue is closed + /// * The current task is canceled + /// + /// Returns how many of `elements` have been added to the queue, if any. + /// If an error is returned, no elements have been added. /// - /// Returns how many elements have been added to the queue. + /// If the queue is closed or the task is canceled, but some items were + /// already added before the closure or cancelation, then `put` may + /// return a number lower than `target`, in which case future calls are + /// guaranteed to return `error.Canceled` or `error.Closed`. /// - /// Asserts that `elements.len >= min`. - pub fn put(q: *@This(), io: Io, elements: []const Elem, min: usize) Cancelable!usize { - return @divExact(try q.type_erased.put(io, @ptrCast(elements), min * @sizeOf(Elem)), @sizeOf(Elem)); + /// A return value of 0 is only possible if `target` is 0, in which case + /// the call is guaranteed to queue as many of `elements` as is possible + /// *without* blocking. + /// + /// Asserts that `elements.len >= target`. + pub fn put(q: *@This(), io: Io, elements: []const Elem, target: usize) (QueueClosedError || Cancelable)!usize { + return @divExact(try q.type_erased.put(io, @ptrCast(elements), target * @sizeOf(Elem)), @sizeOf(Elem)); } /// Same as `put` but blocks until all elements have been added to the queue. - pub fn putAll(q: *@This(), io: Io, elements: []const Elem) Cancelable!void { - assert(try q.put(io, elements, elements.len) == elements.len); + /// + /// If the queue is closed or canceled, `error.Closed` or `error.Canceled` + /// is returned, and it is unspecified how many, if any, of `elements` were + /// added to the queue prior to cancelation or closure. + pub fn putAll(q: *@This(), io: Io, elements: []const Elem) (QueueClosedError || Cancelable)!void { + const n = try q.put(io, elements, elements.len); + if (n != elements.len) { + _ = try q.put(io, elements[n..], elements.len - n); + unreachable; // partial `put` implies queue was closed or we were canceled + } } /// Same as `put`, except does not introduce a cancelation point. /// /// For a description of cancelation and cancelation points, see `Future.cancel`. - pub fn putUncancelable(q: *@This(), io: Io, elements: []const Elem, min: usize) usize { - return @divExact(q.type_erased.putUncancelable(io, @ptrCast(elements), min * @sizeOf(Elem)), @sizeOf(Elem)); + pub fn putUncancelable(q: *@This(), io: Io, elements: []const Elem, min: usize) QueueClosedError!usize { + return @divExact(try q.type_erased.putUncancelable(io, @ptrCast(elements), min * @sizeOf(Elem)), @sizeOf(Elem)); } - pub fn putOne(q: *@This(), io: Io, item: Elem) Cancelable!void { + /// Appends `item` to the end of the queue, blocking if the queue is full. + pub fn putOne(q: *@This(), io: Io, item: Elem) (QueueClosedError || Cancelable)!void { assert(try q.put(io, &.{item}, 1) == 1); } /// Same as `putOne`, except does not introduce a cancelation point. /// /// For a description of cancelation and cancelation points, see `Future.cancel`. - pub fn putOneUncancelable(q: *@This(), io: Io, item: Elem) void { - assert(q.putUncancelable(io, &.{item}, 1) == 1); + pub fn putOneUncancelable(q: *@This(), io: Io, item: Elem) QueueClosedError!void { + assert(try q.putUncancelable(io, &.{item}, 1) == 1); } - /// Receives elements from the beginning of the queue. The function - /// returns when at least `min` elements have been populated inside - /// `buffer`. + /// Receives elements from the beginning of the queue, potentially blocking + /// if there are insufficient elements currently in the queue. Returns when + /// any one of the following conditions is satisfied: + /// + /// * At least `target` elements have been received from the queue + /// * The queue is closed and contains no buffered elements + /// * The current task is canceled + /// + /// Returns how many elements of `buffer` have been populated, if any. + /// If an error is returned, no elements have been populated. + /// + /// If the queue is closed or the task is canceled, but some items were + /// already received before the closure or cancelation, then `get` may + /// return a number lower than `target`, in which case future calls are + /// guaranteed to return `error.Canceled` or `error.Closed`. /// - /// Returns how many elements of `buffer` have been populated. + /// A return value of 0 is only possible if `target` is 0, in which case + /// the call is guaranteed to fill as much of `buffer` as is possible + /// *without* blocking. /// - /// Asserts that `buffer.len >= min`. - pub fn get(q: *@This(), io: Io, buffer: []Elem, min: usize) Cancelable!usize { - return @divExact(try q.type_erased.get(io, @ptrCast(buffer), min * @sizeOf(Elem)), @sizeOf(Elem)); + /// Asserts that `buffer.len >= target`. + pub fn get(q: *@This(), io: Io, buffer: []Elem, target: usize) (QueueClosedError || Cancelable)!usize { + return @divExact(try q.type_erased.get(io, @ptrCast(buffer), target * @sizeOf(Elem)), @sizeOf(Elem)); } /// Same as `get`, except does not introduce a cancelation point. /// /// For a description of cancelation and cancelation points, see `Future.cancel`. - pub fn getUncancelable(q: *@This(), io: Io, buffer: []Elem, min: usize) usize { + pub fn getUncancelable(q: *@This(), io: Io, buffer: []Elem, min: usize) QueueClosedError!usize { return @divExact(try q.type_erased.getUncancelable(io, @ptrCast(buffer), min * @sizeOf(Elem)), @sizeOf(Elem)); } - pub fn getOne(q: *@This(), io: Io) Cancelable!Elem { + /// Receives one element from the beginning of the queue, blocking if the queue is empty. + pub fn getOne(q: *@This(), io: Io) (QueueClosedError || Cancelable)!Elem { var buf: [1]Elem = undefined; assert(try q.get(io, &buf, 1) == 1); return buf[0]; @@ -1852,9 +1991,9 @@ pub fn Queue(Elem: type) type { /// Same as `getOne`, except does not introduce a cancelation point. /// /// For a description of cancelation and cancelation points, see `Future.cancel`. - pub fn getOneUncancelable(q: *@This(), io: Io) Elem { + pub fn getOneUncancelable(q: *@This(), io: Io) QueueClosedError!Elem { var buf: [1]Elem = undefined; - assert(q.getUncancelable(io, &buf, 1) == 1); + assert(try q.getUncancelable(io, &buf, 1) == 1); return buf[0]; } diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig index 65d7f1dec2..37a6a7b656 100644 --- a/lib/std/Io/Threaded.zig +++ b/lib/std/Io/Threaded.zig @@ -5795,11 +5795,13 @@ fn netLookup( host_name: HostName, resolved: *Io.Queue(HostName.LookupResult), options: HostName.LookupOptions, -) void { +) net.HostName.LookupError!void { const t: *Threaded = @ptrCast(@alignCast(userdata)); - const current_thread = Thread.getCurrent(t); - const t_io = io(t); - resolved.putOneUncancelable(t_io, .{ .end = netLookupFallible(t, current_thread, host_name, resolved, options) }); + defer resolved.close(io(t)); + netLookupFallible(t, host_name, resolved, options) catch |err| switch (err) { + error.Closed => unreachable, // `resolved` must not be closed until `netLookup` returns + else => |e| return e, + }; } fn netLookupUnavailable( @@ -5807,22 +5809,23 @@ fn netLookupUnavailable( host_name: HostName, resolved: *Io.Queue(HostName.LookupResult), options: HostName.LookupOptions, -) void { +) net.HostName.LookupError!void { _ = host_name; _ = options; const t: *Threaded = @ptrCast(@alignCast(userdata)); - const t_io = ioBasic(t); - resolved.putOneUncancelable(t_io, .{ .end = error.NetworkDown }); + resolved.close(ioBasic(t)); + return error.NetworkDown; } fn netLookupFallible( t: *Threaded, - current_thread: *Thread, host_name: HostName, resolved: *Io.Queue(HostName.LookupResult), options: HostName.LookupOptions, -) !void { +) (net.HostName.LookupError || Io.QueueClosedError)!void { if (!have_networking) return error.NetworkDown; + + const current_thread: *Thread = .getCurrent(t); const t_io = io(t); const name = host_name.bytes; assert(name.len <= HostName.max_len); @@ -6363,7 +6366,7 @@ fn lookupDnsSearch( host_name: HostName, resolved: *Io.Queue(HostName.LookupResult), options: HostName.LookupOptions, -) HostName.LookupError!void { +) (HostName.LookupError || Io.QueueClosedError)!void { const t_io = io(t); const rc = HostName.ResolvConf.init(t_io) catch return error.ResolvConfParseFailed; @@ -6407,7 +6410,7 @@ fn lookupDns( rc: *const HostName.ResolvConf, resolved: *Io.Queue(HostName.LookupResult), options: HostName.LookupOptions, -) HostName.LookupError!void { +) (HostName.LookupError || Io.QueueClosedError)!void { const t_io = io(t); const family_records: [2]struct { af: IpAddress.Family, rr: HostName.DnsRecord } = .{ .{ .af = .ip6, .rr = .A }, @@ -6621,8 +6624,10 @@ fn lookupHosts( return error.DetectingNetworkConfigurationFailed; }, }, - error.Canceled => |e| return e, - error.UnknownHostName => |e| return e, + error.Canceled, + error.Closed, + error.UnknownHostName, + => |e| return e, }; } @@ -6632,7 +6637,7 @@ fn lookupHostsReader( resolved: *Io.Queue(HostName.LookupResult), options: HostName.LookupOptions, reader: *Io.Reader, -) error{ ReadFailed, Canceled, UnknownHostName }!void { +) error{ ReadFailed, Canceled, UnknownHostName, Closed }!void { const t_io = io(t); var addresses_len: usize = 0; var canonical_name: ?HostName = null; diff --git a/lib/std/Io/net/HostName.zig b/lib/std/Io/net/HostName.zig index e2638abfaa..628a97d1f8 100644 --- a/lib/std/Io/net/HostName.zig +++ b/lib/std/Io/net/HostName.zig @@ -82,19 +82,22 @@ pub const LookupError = error{ pub const LookupResult = union(enum) { address: IpAddress, canonical_name: HostName, - end: LookupError!void, }; -/// Adds any number of `IpAddress` into resolved, exactly one canonical_name, -/// and then always finishes by adding one `LookupResult.end` entry. +/// Adds any number of `LookupResult.address` into `resolved`, and exactly one +/// `LookupResult.canonical_name`. /// /// Guaranteed not to block if provided queue has capacity at least 16. +/// +/// Closes `resolved` before return, even on error. +/// +/// Asserts `resolved` is not closed until this call returns. pub fn lookup( host_name: HostName, io: Io, resolved: *Io.Queue(LookupResult), options: LookupOptions, -) void { +) LookupError!void { return io.vtable.netLookup(io.userdata, host_name, resolved, options); } @@ -211,23 +214,25 @@ pub fn connect( port: u16, options: IpAddress.ConnectOptions, ) ConnectError!Stream { - var connect_many_buffer: [32]ConnectManyResult = undefined; - var connect_many_queue: Io.Queue(ConnectManyResult) = .init(&connect_many_buffer); + var connect_many_buffer: [32]IpAddress.ConnectError!Stream = undefined; + var connect_many_queue: Io.Queue(IpAddress.ConnectError!Stream) = .init(&connect_many_buffer); var connect_many = io.async(connectMany, .{ host_name, io, port, &connect_many_queue, options }); - var saw_end = false; defer { - connect_many.cancel(io); - if (!saw_end) while (true) switch (connect_many_queue.getOneUncancelable(io)) { - .connection => |loser| if (loser) |s| s.close(io) else |_| continue, - .end => break, - }; + connect_many.cancel(io) catch {}; + while (connect_many_queue.getOneUncancelable(io)) |loser| { + if (loser) |s| s.close(io) else |_| {} + } else |err| switch (err) { + error.Closed => {}, + } } - var aggregate_error: ConnectError = error.UnknownHostName; + var ip_connect_error: ?IpAddress.ConnectError = null; - while (connect_many_queue.getOne(io)) |result| switch (result) { - .connection => |connection| if (connection) |stream| return stream else |err| switch (err) { + while (connect_many_queue.getOne(io)) |result| { + if (result) |stream| { + return stream; + } else |err| switch (err) { error.SystemResources, error.OptionUnsupported, error.ProcessFdQuotaExceeded, @@ -237,66 +242,80 @@ pub fn connect( error.WouldBlock => return error.Unexpected, - else => |e| aggregate_error = e, - }, - .end => |end| { - saw_end = true; - try end; - return aggregate_error; - }, + else => |e| ip_connect_error = e, + } } else |err| switch (err) { error.Canceled => |e| return e, + error.Closed => { + // There was no successful connection attempt. If there was a lookup error, return that. + try connect_many.await(io); + // Otherwise, return the error from a failed IP connection attempt. + return ip_connect_error orelse + return error.UnknownHostName; + }, } } -pub const ConnectManyResult = union(enum) { - connection: IpAddress.ConnectError!Stream, - end: ConnectError!void, -}; - /// Asynchronously establishes a connection to all IP addresses associated with /// a host name, adding them to a results queue upon completion. +/// +/// Closes `results` before return, even on error. +/// +/// Asserts `results` is not closed until this call returns. pub fn connectMany( host_name: HostName, io: Io, port: u16, - results: *Io.Queue(ConnectManyResult), + results: *Io.Queue(IpAddress.ConnectError!Stream), options: IpAddress.ConnectOptions, -) void { +) LookupError!void { + defer results.close(io); + var canonical_name_buffer: [max_len]u8 = undefined; var lookup_buffer: [32]HostName.LookupResult = undefined; var lookup_queue: Io.Queue(LookupResult) = .init(&lookup_buffer); - var group: Io.Group = .init; - defer group.cancel(io); - - group.async(io, lookup, .{ host_name, io, &lookup_queue, .{ + var lookup_future = io.async(lookup, .{ host_name, io, &lookup_queue, .{ .port = port, .canonical_name_buffer = &canonical_name_buffer, } }); + defer lookup_future.cancel(io) catch {}; + + var group: Io.Group = .init; + defer group.cancel(io); while (lookup_queue.getOne(io)) |dns_result| switch (dns_result) { .address => |address| group.async(io, enqueueConnection, .{ address, io, results, options }), .canonical_name => continue, - .end => |lookup_result| { - group.wait(io); - results.putOneUncancelable(io, .{ .end = lookup_result }); - return; - }, } else |err| switch (err) { - error.Canceled => |e| { - group.cancel(io); - results.putOneUncancelable(io, .{ .end = e }); + error.Canceled => |e| return e, + error.Closed => { + group.wait(io); + return lookup_future.await(io); }, } } - fn enqueueConnection( address: IpAddress, io: Io, - queue: *Io.Queue(ConnectManyResult), + queue: *Io.Queue(IpAddress.ConnectError!Stream), options: IpAddress.ConnectOptions, ) void { - queue.putOneUncancelable(io, .{ .connection = address.connect(io, options) }); + enqueueConnectionFallible(address, io, queue, options) catch |err| switch (err) { + error.Canceled => {}, + }; +} +fn enqueueConnectionFallible( + address: IpAddress, + io: Io, + queue: *Io.Queue(IpAddress.ConnectError!Stream), + options: IpAddress.ConnectOptions, +) Io.Cancelable!void { + const result = address.connect(io, options); + errdefer if (result) |s| s.close(io) else |_| {}; + queue.putOne(io, result) catch |err| switch (err) { + error.Closed => unreachable, // `queue` must not be closed + error.Canceled => |e| return e, + }; } pub const ResolvConf = struct { diff --git a/lib/std/Io/net/test.zig b/lib/std/Io/net/test.zig index 2a7a151b5d..e234a9edde 100644 --- a/lib/std/Io/net/test.zig +++ b/lib/std/Io/net/test.zig @@ -129,7 +129,7 @@ test "resolve DNS" { var results_buffer: [32]net.HostName.LookupResult = undefined; var results: Io.Queue(net.HostName.LookupResult) = .init(&results_buffer); - net.HostName.lookup(try .init("localhost"), io, &results, .{ + try net.HostName.lookup(try .init("localhost"), io, &results, .{ .port = 80, .canonical_name_buffer = &canonical_name_buffer, }); @@ -142,11 +142,10 @@ test "resolve DNS" { addresses_found += 1; }, .canonical_name => |canonical_name| try testing.expectEqualStrings("localhost", canonical_name.bytes), - .end => |end| { - try end; - break; - }, - } else |err| return err; + } else |err| switch (err) { + error.Closed => {}, + error.Canceled => |e| return e, + } try testing.expect(addresses_found != 0); } @@ -161,20 +160,19 @@ test "resolve DNS" { net.HostName.lookup(try .init("example.com"), io, &results, .{ .port = 80, .canonical_name_buffer = &canonical_name_buffer, - }); + }) catch |err| switch (err) { + error.UnknownHostName => return error.SkipZigTest, + error.NameServerFailure => return error.SkipZigTest, + else => |e| return e, + }; while (results.getOne(io)) |result| switch (result) { .address => {}, .canonical_name => {}, - .end => |end| { - end catch |err| switch (err) { - error.UnknownHostName => return error.SkipZigTest, - error.NameServerFailure => return error.SkipZigTest, - else => return err, - }; - break; - }, - } else |err| return err; + } else |err| switch (err) { + error.Closed => {}, + error.Canceled => |e| return e, + } } } diff --git a/lib/std/Io/test.zig b/lib/std/Io/test.zig index 94f280b358..f7965ed14e 100644 --- a/lib/std/Io/test.zig +++ b/lib/std/Io/test.zig @@ -209,10 +209,10 @@ test "select" { return; }, }; - defer if (get_a.cancel(io)) |_| {} else |_| @panic("fail"); + defer _ = get_a.cancel(io) catch {}; var get_b = try io.concurrent(Io.Queue(u8).getOne, .{ &queue, io }); - defer if (get_b.cancel(io)) |_| {} else |_| @panic("fail"); + defer _ = get_b.cancel(io) catch {}; var timeout = io.async(Io.sleep, .{ io, .fromMilliseconds(1), .awake }); defer timeout.cancel(io) catch {}; @@ -225,12 +225,9 @@ test "select" { .get_a => return error.TestFailure, .get_b => return error.TestFailure, .timeout => { - // Unblock the queues to avoid making this unit test depend on - // cancellation. - queue.putOneUncancelable(io, 1); - queue.putOneUncancelable(io, 1); - try testing.expectEqual(1, try get_a.await(io)); - try testing.expectEqual(1, try get_b.await(io)); + queue.close(io); + try testing.expectError(error.Closed, get_a.await(io)); + try testing.expectError(error.Closed, get_b.await(io)); }, } } @@ -256,6 +253,54 @@ test "Queue" { try testQueue(5); } +test "Queue.close single-threaded" { + const io = std.testing.io; + + var buf: [10]u8 = undefined; + var queue: Io.Queue(u8) = .init(&buf); + + try queue.putAll(io, &.{ 0, 1, 2, 3, 4, 5, 6 }); + try expectEqual(3, try queue.put(io, &.{ 7, 8, 9, 10 }, 0)); // there is capacity for 3 more items + + var get_buf: [4]u8 = undefined; + + // Receive some elements before closing + try expectEqual(4, try queue.get(io, &get_buf, 0)); + try expectEqual(0, get_buf[0]); + try expectEqual(1, get_buf[1]); + try expectEqual(2, get_buf[2]); + try expectEqual(3, get_buf[3]); + try expectEqual(4, try queue.getOne(io)); + + // ...and add a couple more now there's space + try queue.putAll(io, &.{ 20, 21 }); + + queue.close(io); + + // Receive more elements *after* closing + try expectEqual(4, try queue.get(io, &get_buf, 0)); + try expectEqual(5, get_buf[0]); + try expectEqual(6, get_buf[1]); + try expectEqual(7, get_buf[2]); + try expectEqual(8, get_buf[3]); + try expectEqual(9, try queue.getOne(io)); + + // Cannot put anything while closed, even if the buffer has space + try expectError(error.Closed, queue.putOne(io, 100)); + try expectError(error.Closed, queue.putAll(io, &.{ 101, 102 })); + try expectError(error.Closed, queue.putUncancelable(io, &.{ 103, 104 }, 0)); + + // Even if we ask for 3 items, the queue is closed, so we only get the last 2 + try expectEqual(2, try queue.get(io, &get_buf, 4)); + try expectEqual(20, get_buf[0]); + try expectEqual(21, get_buf[1]); + + // The queue is now empty, so `get` should return `error.Closed` too + try expectError(error.Closed, queue.getOne(io)); + try expectError(error.Closed, queue.get(io, &get_buf, 0)); + try expectError(error.Closed, queue.putUncancelable(io, &get_buf, 2)); +} + test "Event" { const global = struct { fn waitAndRead(io: Io, event: *Io.Event, ptr: *const u32) Io.Cancelable!u32 { |
