aboutsummaryrefslogtreecommitdiff
path: root/lib/std
diff options
context:
space:
mode:
Diffstat (limited to 'lib/std')
-rw-r--r--lib/std/Io.zig303
-rw-r--r--lib/std/Io/Threaded.zig33
-rw-r--r--lib/std/Io/net/HostName.zig107
-rw-r--r--lib/std/Io/net/test.zig30
-rw-r--r--lib/std/Io/test.zig61
5 files changed, 370 insertions, 164 deletions
diff --git a/lib/std/Io.zig b/lib/std/Io.zig
index 7c5c0ed70a..367871787c 100644
--- a/lib/std/Io.zig
+++ b/lib/std/Io.zig
@@ -701,7 +701,7 @@ pub const VTable = struct {
netClose: *const fn (?*anyopaque, handle: net.Socket.Handle) void,
netInterfaceNameResolve: *const fn (?*anyopaque, *const net.Interface.Name) net.Interface.Name.ResolveError!net.Interface,
netInterfaceName: *const fn (?*anyopaque, net.Interface) net.Interface.NameError!net.Interface.Name,
- netLookup: *const fn (?*anyopaque, net.HostName, *Queue(net.HostName.LookupResult), net.HostName.LookupOptions) void,
+ netLookup: *const fn (?*anyopaque, net.HostName, *Queue(net.HostName.LookupResult), net.HostName.LookupOptions) net.HostName.LookupError!void,
};
pub const Cancelable = error{
@@ -1208,7 +1208,9 @@ pub fn Select(comptime U: type) type {
const args_casted: *const Args = @ptrCast(@alignCast(context));
const unerased_select: *S = @fieldParentPtr("group", group);
const elem = @unionInit(U, @tagName(field), @call(.auto, function, args_casted.*));
- unerased_select.queue.putOneUncancelable(unerased_select.io, elem);
+ unerased_select.queue.putOneUncancelable(unerased_select.io, elem) catch |err| switch (err) {
+ error.Closed => unreachable,
+ };
}
};
_ = @atomicRmw(usize, &s.outstanding, .Add, 1, .monotonic);
@@ -1222,7 +1224,10 @@ pub fn Select(comptime U: type) type {
/// Not threadsafe.
pub fn wait(s: *S) Cancelable!U {
s.outstanding -= 1;
- return s.queue.getOne(s.io);
+ return s.queue.getOne(s.io) catch |err| switch (err) {
+ error.Canceled => |e| return e,
+ error.Closed => unreachable,
+ };
}
/// Equivalent to `wait` but requests cancellation on all remaining
@@ -1569,8 +1574,11 @@ pub const Event = enum(u32) {
}
};
+pub const QueueClosedError = error{Closed};
+
pub const TypeErasedQueue = struct {
mutex: Mutex,
+ closed: bool,
/// Ring buffer. This data is logically *after* queued getters.
buffer: []u8,
@@ -1582,12 +1590,14 @@ pub const TypeErasedQueue = struct {
const Put = struct {
remaining: []const u8,
+ needed: usize,
condition: Condition,
node: std.DoublyLinkedList.Node,
};
const Get = struct {
remaining: []u8,
+ needed: usize,
condition: Condition,
node: std.DoublyLinkedList.Node,
};
@@ -1595,6 +1605,7 @@ pub const TypeErasedQueue = struct {
pub fn init(buffer: []u8) TypeErasedQueue {
return .{
.mutex = .init,
+ .closed = false,
.buffer = buffer,
.start = 0,
.len = 0,
@@ -1603,7 +1614,27 @@ pub const TypeErasedQueue = struct {
};
}
- pub fn put(q: *TypeErasedQueue, io: Io, elements: []const u8, min: usize) Cancelable!usize {
+ pub fn close(q: *TypeErasedQueue, io: Io) void {
+ q.mutex.lockUncancelable(io);
+ defer q.mutex.unlock(io);
+ q.closed = true;
+ {
+ var it = q.getters.first;
+ while (it) |node| : (it = node.next) {
+ const getter: *Get = @alignCast(@fieldParentPtr("node", node));
+ getter.condition.signal(io);
+ }
+ }
+ {
+ var it = q.putters.first;
+ while (it) |node| : (it = node.next) {
+ const putter: *Put = @alignCast(@fieldParentPtr("node", node));
+ putter.condition.signal(io);
+ }
+ }
+ }
+
+ pub fn put(q: *TypeErasedQueue, io: Io, elements: []const u8, min: usize) (QueueClosedError || Cancelable)!usize {
assert(elements.len >= min);
if (elements.len == 0) return 0;
try q.mutex.lock(io);
@@ -1614,13 +1645,14 @@ pub const TypeErasedQueue = struct {
/// Same as `put`, except does not introduce a cancelation point.
///
/// For a description of cancelation and cancelation points, see `Future.cancel`.
- pub fn putUncancelable(q: *TypeErasedQueue, io: Io, elements: []const u8, min: usize) usize {
+ pub fn putUncancelable(q: *TypeErasedQueue, io: Io, elements: []const u8, min: usize) QueueClosedError!usize {
assert(elements.len >= min);
if (elements.len == 0) return 0;
q.mutex.lockUncancelable(io);
defer q.mutex.unlock(io);
return q.putLocked(io, elements, min, true) catch |err| switch (err) {
error.Canceled => unreachable,
+ error.Closed => |e| return e,
};
}
@@ -1634,49 +1666,79 @@ pub const TypeErasedQueue = struct {
return if (slice.len > 0) slice else null;
}
- fn putLocked(q: *TypeErasedQueue, io: Io, elements: []const u8, min: usize, uncancelable: bool) Cancelable!usize {
+ fn putLocked(q: *TypeErasedQueue, io: Io, elements: []const u8, target: usize, uncancelable: bool) (QueueClosedError || Cancelable)!usize {
+ // A closed queue cannot be added to, even if there is space in the buffer.
+ if (q.closed) return error.Closed;
+
// Getters have first priority on the data, and only when the getters
// queue is empty do we start populating the buffer.
- var remaining = elements;
+ // The number of elements we add immediately, before possibly blocking.
+ var n: usize = 0;
+
while (q.getters.popFirst()) |getter_node| {
const getter: *Get = @alignCast(@fieldParentPtr("node", getter_node));
- const copy_len = @min(getter.remaining.len, remaining.len);
+ const copy_len = @min(getter.remaining.len, elements.len - n);
assert(copy_len > 0);
- @memcpy(getter.remaining[0..copy_len], remaining[0..copy_len]);
- remaining = remaining[copy_len..];
+ @memcpy(getter.remaining[0..copy_len], elements[n..][0..copy_len]);
getter.remaining = getter.remaining[copy_len..];
- if (getter.remaining.len == 0) {
+ getter.needed -|= copy_len;
+ n += copy_len;
+ if (getter.needed == 0) {
getter.condition.signal(io);
- if (remaining.len > 0) continue;
- } else q.getters.prepend(getter_node);
- assert(remaining.len == 0);
- return elements.len;
+ } else {
+ assert(n == elements.len); // we didn't have enough elements for the getter
+ q.getters.prepend(getter_node);
+ }
+ if (n == elements.len) return elements.len;
}
while (q.puttableSlice()) |slice| {
- const copy_len = @min(slice.len, remaining.len);
+ const copy_len = @min(slice.len, elements.len - n);
assert(copy_len > 0);
- @memcpy(slice[0..copy_len], remaining[0..copy_len]);
+ @memcpy(slice[0..copy_len], elements[n..][0..copy_len]);
q.len += copy_len;
- remaining = remaining[copy_len..];
- if (remaining.len == 0) return elements.len;
+ n += copy_len;
+ if (n == elements.len) return elements.len;
}
- const total_filled = elements.len - remaining.len;
- if (total_filled >= min) return total_filled;
+ // Don't block if we hit the target.
+ if (n >= target) return n;
- var pending: Put = .{ .remaining = remaining, .condition = .{}, .node = .{} };
+ var pending: Put = .{
+ .remaining = elements[n..],
+ .needed = target - n,
+ .condition = .init,
+ .node = .{},
+ };
q.putters.append(&pending.node);
- defer if (pending.remaining.len > 0) q.putters.remove(&pending.node);
- while (pending.remaining.len > 0) if (uncancelable)
- pending.condition.waitUncancelable(io, &q.mutex)
- else
- try pending.condition.wait(io, &q.mutex);
- return elements.len;
+ defer if (pending.needed > 0) q.putters.remove(&pending.node);
+
+ while (pending.needed > 0 and !q.closed) {
+ if (uncancelable) {
+ pending.condition.waitUncancelable(io, &q.mutex);
+ continue;
+ }
+ pending.condition.wait(io, &q.mutex) catch |err| switch (err) {
+ error.Canceled => if (pending.remaining.len == elements.len) {
+ // Canceled while waiting, and appended no elements.
+ return error.Canceled;
+ } else {
+ // Canceled while waiting, but appended some elements, so report those first.
+ io.recancel();
+ return elements.len - pending.remaining.len;
+ },
+ };
+ }
+ if (pending.remaining.len == elements.len) {
+ // The queue was closed while we were waiting. We appended no elements.
+ assert(q.closed);
+ return error.Closed;
+ }
+ return elements.len - pending.remaining.len;
}
- pub fn get(q: *@This(), io: Io, buffer: []u8, min: usize) Cancelable!usize {
+ pub fn get(q: *TypeErasedQueue, io: Io, buffer: []u8, min: usize) (QueueClosedError || Cancelable)!usize {
assert(buffer.len >= min);
if (buffer.len == 0) return 0;
try q.mutex.lock(io);
@@ -1687,13 +1749,14 @@ pub const TypeErasedQueue = struct {
/// Same as `get`, except does not introduce a cancelation point.
///
/// For a description of cancelation and cancelation points, see `Future.cancel`.
- pub fn getUncancelable(q: *TypeErasedQueue, io: Io, buffer: []u8, min: usize) usize {
+ pub fn getUncancelable(q: *TypeErasedQueue, io: Io, buffer: []u8, min: usize) QueueClosedError!usize {
assert(buffer.len >= min);
if (buffer.len == 0) return 0;
q.mutex.lockUncancelable(io);
defer q.mutex.unlock(io);
return q.getLocked(io, buffer, min, true) catch |err| switch (err) {
error.Canceled => unreachable,
+ error.Closed => |e| return e,
};
}
@@ -1703,21 +1766,23 @@ pub const TypeErasedQueue = struct {
return if (slice.len > 0) slice else null;
}
- fn getLocked(q: *@This(), io: Io, buffer: []u8, min: usize, uncancelable: bool) Cancelable!usize {
+ fn getLocked(q: *TypeErasedQueue, io: Io, buffer: []u8, target: usize, uncancelable: bool) (QueueClosedError || Cancelable)!usize {
// The ring buffer gets first priority, then data should come from any
// queued putters, then finally the ring buffer should be filled with
// data from putters so they can be resumed.
- var remaining = buffer;
+ // The number of elements we received immediately, before possibly blocking.
+ var n: usize = 0;
+
while (q.gettableSlice()) |slice| {
- const copy_len = @min(slice.len, remaining.len);
+ const copy_len = @min(slice.len, buffer.len - n);
assert(copy_len > 0);
- @memcpy(remaining[0..copy_len], slice[0..copy_len]);
+ @memcpy(buffer[n..][0..copy_len], slice[0..copy_len]);
q.start += copy_len;
if (q.buffer.len - q.start == 0) q.start = 0;
q.len -= copy_len;
- remaining = remaining[copy_len..];
- if (remaining.len == 0) {
+ n += copy_len;
+ if (n == buffer.len) {
q.fillRingBufferFromPutters(io);
return buffer.len;
}
@@ -1726,33 +1791,64 @@ pub const TypeErasedQueue = struct {
// Copy directly from putters into buffer.
while (q.putters.popFirst()) |putter_node| {
const putter: *Put = @alignCast(@fieldParentPtr("node", putter_node));
- const copy_len = @min(putter.remaining.len, remaining.len);
+ const copy_len = @min(putter.remaining.len, buffer.len - n);
assert(copy_len > 0);
- @memcpy(remaining[0..copy_len], putter.remaining[0..copy_len]);
+ @memcpy(buffer[n..][0..copy_len], putter.remaining[0..copy_len]);
putter.remaining = putter.remaining[copy_len..];
- remaining = remaining[copy_len..];
- if (putter.remaining.len == 0) {
+ putter.needed -|= copy_len;
+ n += copy_len;
+ if (putter.needed == 0) {
putter.condition.signal(io);
- if (remaining.len > 0) continue;
- } else q.putters.prepend(putter_node);
- assert(remaining.len == 0);
- q.fillRingBufferFromPutters(io);
- return buffer.len;
+ } else {
+ assert(n == buffer.len); // we didn't have enough space for the putter
+ q.putters.prepend(putter_node);
+ }
+ if (n == buffer.len) {
+ q.fillRingBufferFromPutters(io);
+ return buffer.len;
+ }
}
- // Both ring buffer and putters queue is empty.
- const total_filled = buffer.len - remaining.len;
- if (total_filled >= min) return total_filled;
+ // No need to call `fillRingBufferFromPutters` from this point onwards,
+ // because we emptied the ring buffer *and* the putter queue!
- var pending: Get = .{ .remaining = remaining, .condition = .{}, .node = .{} };
+ // Don't block if we hit the target or if the queue is closed. Return how
+ // many elements we could get immediately, unless the queue was closed and
+ // empty, in which case report `error.Closed`.
+ if (n == 0 and q.closed) return error.Closed;
+ if (n >= target or q.closed) return n;
+
+ var pending: Get = .{
+ .remaining = buffer[n..],
+ .needed = target - n,
+ .condition = .init,
+ .node = .{},
+ };
q.getters.append(&pending.node);
- defer if (pending.remaining.len > 0) q.getters.remove(&pending.node);
- while (pending.remaining.len > 0) if (uncancelable)
- pending.condition.waitUncancelable(io, &q.mutex)
- else
- try pending.condition.wait(io, &q.mutex);
- q.fillRingBufferFromPutters(io);
- return buffer.len;
+ defer if (pending.needed > 0) q.getters.remove(&pending.node);
+
+ while (pending.needed > 0 and !q.closed) {
+ if (uncancelable) {
+ pending.condition.waitUncancelable(io, &q.mutex);
+ continue;
+ }
+ pending.condition.wait(io, &q.mutex) catch |err| switch (err) {
+ error.Canceled => if (pending.remaining.len == buffer.len) {
+ // Canceled while waiting, and received no elements.
+ return error.Canceled;
+ } else {
+ // Canceled while waiting, but received some elements, so report those first.
+ io.recancel();
+ return buffer.len - pending.remaining.len;
+ },
+ };
+ }
+ if (pending.remaining.len == buffer.len) {
+ // The queue was closed while we were waiting. We received no elements.
+ assert(q.closed);
+ return error.Closed;
+ }
+ return buffer.len - pending.remaining.len;
}
/// Called when there is nonzero space available in the ring buffer and
@@ -1768,7 +1864,8 @@ pub const TypeErasedQueue = struct {
@memcpy(slice[0..copy_len], putter.remaining[0..copy_len]);
q.len += copy_len;
putter.remaining = putter.remaining[copy_len..];
- if (putter.remaining.len == 0) {
+ putter.needed -|= copy_len;
+ if (putter.needed == 0) {
putter.condition.signal(io);
break;
}
@@ -1791,59 +1888,101 @@ pub fn Queue(Elem: type) type {
return .{ .type_erased = .init(@ptrCast(buffer)) };
}
- /// Appends elements to the end of the queue. The function returns when
- /// at least `min` elements have been added to the buffer or sent
- /// directly to a consumer.
+ pub fn close(q: *@This(), io: Io) void {
+ q.type_erased.close(io);
+ }
+
+ /// Appends elements to the end of the queue, potentially blocking if
+ /// there is insufficient capacity. Returns when any one of the
+ /// following conditions is satisfied:
+ ///
+ /// * At least `target` elements have been added to the queue
+ /// * The queue is closed
+ /// * The current task is canceled
+ ///
+ /// Returns how many of `elements` have been added to the queue, if any.
+ /// If an error is returned, no elements have been added.
///
- /// Returns how many elements have been added to the queue.
+ /// If the queue is closed or the task is canceled, but some items were
+ /// already added before the closure or cancelation, then `put` may
+ /// return a number lower than `target`, in which case future calls are
+ /// guaranteed to return `error.Canceled` or `error.Closed`.
///
- /// Asserts that `elements.len >= min`.
- pub fn put(q: *@This(), io: Io, elements: []const Elem, min: usize) Cancelable!usize {
- return @divExact(try q.type_erased.put(io, @ptrCast(elements), min * @sizeOf(Elem)), @sizeOf(Elem));
+ /// A return value of 0 is only possible if `target` is 0, in which case
+ /// the call is guaranteed to queue as many of `elements` as is possible
+ /// *without* blocking.
+ ///
+ /// Asserts that `elements.len >= target`.
+ pub fn put(q: *@This(), io: Io, elements: []const Elem, target: usize) (QueueClosedError || Cancelable)!usize {
+ return @divExact(try q.type_erased.put(io, @ptrCast(elements), target * @sizeOf(Elem)), @sizeOf(Elem));
}
/// Same as `put` but blocks until all elements have been added to the queue.
- pub fn putAll(q: *@This(), io: Io, elements: []const Elem) Cancelable!void {
- assert(try q.put(io, elements, elements.len) == elements.len);
+ ///
+ /// If the queue is closed or canceled, `error.Closed` or `error.Canceled`
+ /// is returned, and it is unspecified how many, if any, of `elements` were
+ /// added to the queue prior to cancelation or closure.
+ pub fn putAll(q: *@This(), io: Io, elements: []const Elem) (QueueClosedError || Cancelable)!void {
+ const n = try q.put(io, elements, elements.len);
+ if (n != elements.len) {
+ _ = try q.put(io, elements[n..], elements.len - n);
+ unreachable; // partial `put` implies queue was closed or we were canceled
+ }
}
/// Same as `put`, except does not introduce a cancelation point.
///
/// For a description of cancelation and cancelation points, see `Future.cancel`.
- pub fn putUncancelable(q: *@This(), io: Io, elements: []const Elem, min: usize) usize {
- return @divExact(q.type_erased.putUncancelable(io, @ptrCast(elements), min * @sizeOf(Elem)), @sizeOf(Elem));
+ pub fn putUncancelable(q: *@This(), io: Io, elements: []const Elem, min: usize) QueueClosedError!usize {
+ return @divExact(try q.type_erased.putUncancelable(io, @ptrCast(elements), min * @sizeOf(Elem)), @sizeOf(Elem));
}
- pub fn putOne(q: *@This(), io: Io, item: Elem) Cancelable!void {
+ /// Appends `item` to the end of the queue, blocking if the queue is full.
+ pub fn putOne(q: *@This(), io: Io, item: Elem) (QueueClosedError || Cancelable)!void {
assert(try q.put(io, &.{item}, 1) == 1);
}
/// Same as `putOne`, except does not introduce a cancelation point.
///
/// For a description of cancelation and cancelation points, see `Future.cancel`.
- pub fn putOneUncancelable(q: *@This(), io: Io, item: Elem) void {
- assert(q.putUncancelable(io, &.{item}, 1) == 1);
+ pub fn putOneUncancelable(q: *@This(), io: Io, item: Elem) QueueClosedError!void {
+ assert(try q.putUncancelable(io, &.{item}, 1) == 1);
}
- /// Receives elements from the beginning of the queue. The function
- /// returns when at least `min` elements have been populated inside
- /// `buffer`.
+ /// Receives elements from the beginning of the queue, potentially blocking
+ /// if there are insufficient elements currently in the queue. Returns when
+ /// any one of the following conditions is satisfied:
+ ///
+ /// * At least `target` elements have been received from the queue
+ /// * The queue is closed and contains no buffered elements
+ /// * The current task is canceled
+ ///
+ /// Returns how many elements of `buffer` have been populated, if any.
+ /// If an error is returned, no elements have been populated.
+ ///
+ /// If the queue is closed or the task is canceled, but some items were
+ /// already received before the closure or cancelation, then `get` may
+ /// return a number lower than `target`, in which case future calls are
+ /// guaranteed to return `error.Canceled` or `error.Closed`.
///
- /// Returns how many elements of `buffer` have been populated.
+ /// A return value of 0 is only possible if `target` is 0, in which case
+ /// the call is guaranteed to fill as much of `buffer` as is possible
+ /// *without* blocking.
///
- /// Asserts that `buffer.len >= min`.
- pub fn get(q: *@This(), io: Io, buffer: []Elem, min: usize) Cancelable!usize {
- return @divExact(try q.type_erased.get(io, @ptrCast(buffer), min * @sizeOf(Elem)), @sizeOf(Elem));
+ /// Asserts that `buffer.len >= target`.
+ pub fn get(q: *@This(), io: Io, buffer: []Elem, target: usize) (QueueClosedError || Cancelable)!usize {
+ return @divExact(try q.type_erased.get(io, @ptrCast(buffer), target * @sizeOf(Elem)), @sizeOf(Elem));
}
/// Same as `get`, except does not introduce a cancelation point.
///
/// For a description of cancelation and cancelation points, see `Future.cancel`.
- pub fn getUncancelable(q: *@This(), io: Io, buffer: []Elem, min: usize) usize {
+ pub fn getUncancelable(q: *@This(), io: Io, buffer: []Elem, min: usize) QueueClosedError!usize {
return @divExact(try q.type_erased.getUncancelable(io, @ptrCast(buffer), min * @sizeOf(Elem)), @sizeOf(Elem));
}
- pub fn getOne(q: *@This(), io: Io) Cancelable!Elem {
+ /// Receives one element from the beginning of the queue, blocking if the queue is empty.
+ pub fn getOne(q: *@This(), io: Io) (QueueClosedError || Cancelable)!Elem {
var buf: [1]Elem = undefined;
assert(try q.get(io, &buf, 1) == 1);
return buf[0];
@@ -1852,9 +1991,9 @@ pub fn Queue(Elem: type) type {
/// Same as `getOne`, except does not introduce a cancelation point.
///
/// For a description of cancelation and cancelation points, see `Future.cancel`.
- pub fn getOneUncancelable(q: *@This(), io: Io) Elem {
+ pub fn getOneUncancelable(q: *@This(), io: Io) QueueClosedError!Elem {
var buf: [1]Elem = undefined;
- assert(q.getUncancelable(io, &buf, 1) == 1);
+ assert(try q.getUncancelable(io, &buf, 1) == 1);
return buf[0];
}
diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig
index 65d7f1dec2..37a6a7b656 100644
--- a/lib/std/Io/Threaded.zig
+++ b/lib/std/Io/Threaded.zig
@@ -5795,11 +5795,13 @@ fn netLookup(
host_name: HostName,
resolved: *Io.Queue(HostName.LookupResult),
options: HostName.LookupOptions,
-) void {
+) net.HostName.LookupError!void {
const t: *Threaded = @ptrCast(@alignCast(userdata));
- const current_thread = Thread.getCurrent(t);
- const t_io = io(t);
- resolved.putOneUncancelable(t_io, .{ .end = netLookupFallible(t, current_thread, host_name, resolved, options) });
+ defer resolved.close(io(t));
+ netLookupFallible(t, host_name, resolved, options) catch |err| switch (err) {
+ error.Closed => unreachable, // `resolved` must not be closed until `netLookup` returns
+ else => |e| return e,
+ };
}
fn netLookupUnavailable(
@@ -5807,22 +5809,23 @@ fn netLookupUnavailable(
host_name: HostName,
resolved: *Io.Queue(HostName.LookupResult),
options: HostName.LookupOptions,
-) void {
+) net.HostName.LookupError!void {
_ = host_name;
_ = options;
const t: *Threaded = @ptrCast(@alignCast(userdata));
- const t_io = ioBasic(t);
- resolved.putOneUncancelable(t_io, .{ .end = error.NetworkDown });
+ resolved.close(ioBasic(t));
+ return error.NetworkDown;
}
fn netLookupFallible(
t: *Threaded,
- current_thread: *Thread,
host_name: HostName,
resolved: *Io.Queue(HostName.LookupResult),
options: HostName.LookupOptions,
-) !void {
+) (net.HostName.LookupError || Io.QueueClosedError)!void {
if (!have_networking) return error.NetworkDown;
+
+ const current_thread: *Thread = .getCurrent(t);
const t_io = io(t);
const name = host_name.bytes;
assert(name.len <= HostName.max_len);
@@ -6363,7 +6366,7 @@ fn lookupDnsSearch(
host_name: HostName,
resolved: *Io.Queue(HostName.LookupResult),
options: HostName.LookupOptions,
-) HostName.LookupError!void {
+) (HostName.LookupError || Io.QueueClosedError)!void {
const t_io = io(t);
const rc = HostName.ResolvConf.init(t_io) catch return error.ResolvConfParseFailed;
@@ -6407,7 +6410,7 @@ fn lookupDns(
rc: *const HostName.ResolvConf,
resolved: *Io.Queue(HostName.LookupResult),
options: HostName.LookupOptions,
-) HostName.LookupError!void {
+) (HostName.LookupError || Io.QueueClosedError)!void {
const t_io = io(t);
const family_records: [2]struct { af: IpAddress.Family, rr: HostName.DnsRecord } = .{
.{ .af = .ip6, .rr = .A },
@@ -6621,8 +6624,10 @@ fn lookupHosts(
return error.DetectingNetworkConfigurationFailed;
},
},
- error.Canceled => |e| return e,
- error.UnknownHostName => |e| return e,
+ error.Canceled,
+ error.Closed,
+ error.UnknownHostName,
+ => |e| return e,
};
}
@@ -6632,7 +6637,7 @@ fn lookupHostsReader(
resolved: *Io.Queue(HostName.LookupResult),
options: HostName.LookupOptions,
reader: *Io.Reader,
-) error{ ReadFailed, Canceled, UnknownHostName }!void {
+) error{ ReadFailed, Canceled, UnknownHostName, Closed }!void {
const t_io = io(t);
var addresses_len: usize = 0;
var canonical_name: ?HostName = null;
diff --git a/lib/std/Io/net/HostName.zig b/lib/std/Io/net/HostName.zig
index e2638abfaa..628a97d1f8 100644
--- a/lib/std/Io/net/HostName.zig
+++ b/lib/std/Io/net/HostName.zig
@@ -82,19 +82,22 @@ pub const LookupError = error{
pub const LookupResult = union(enum) {
address: IpAddress,
canonical_name: HostName,
- end: LookupError!void,
};
-/// Adds any number of `IpAddress` into resolved, exactly one canonical_name,
-/// and then always finishes by adding one `LookupResult.end` entry.
+/// Adds any number of `LookupResult.address` into `resolved`, and exactly one
+/// `LookupResult.canonical_name`.
///
/// Guaranteed not to block if provided queue has capacity at least 16.
+///
+/// Closes `resolved` before return, even on error.
+///
+/// Asserts `resolved` is not closed until this call returns.
pub fn lookup(
host_name: HostName,
io: Io,
resolved: *Io.Queue(LookupResult),
options: LookupOptions,
-) void {
+) LookupError!void {
return io.vtable.netLookup(io.userdata, host_name, resolved, options);
}
@@ -211,23 +214,25 @@ pub fn connect(
port: u16,
options: IpAddress.ConnectOptions,
) ConnectError!Stream {
- var connect_many_buffer: [32]ConnectManyResult = undefined;
- var connect_many_queue: Io.Queue(ConnectManyResult) = .init(&connect_many_buffer);
+ var connect_many_buffer: [32]IpAddress.ConnectError!Stream = undefined;
+ var connect_many_queue: Io.Queue(IpAddress.ConnectError!Stream) = .init(&connect_many_buffer);
var connect_many = io.async(connectMany, .{ host_name, io, port, &connect_many_queue, options });
- var saw_end = false;
defer {
- connect_many.cancel(io);
- if (!saw_end) while (true) switch (connect_many_queue.getOneUncancelable(io)) {
- .connection => |loser| if (loser) |s| s.close(io) else |_| continue,
- .end => break,
- };
+ connect_many.cancel(io) catch {};
+ while (connect_many_queue.getOneUncancelable(io)) |loser| {
+ if (loser) |s| s.close(io) else |_| {}
+ } else |err| switch (err) {
+ error.Closed => {},
+ }
}
- var aggregate_error: ConnectError = error.UnknownHostName;
+ var ip_connect_error: ?IpAddress.ConnectError = null;
- while (connect_many_queue.getOne(io)) |result| switch (result) {
- .connection => |connection| if (connection) |stream| return stream else |err| switch (err) {
+ while (connect_many_queue.getOne(io)) |result| {
+ if (result) |stream| {
+ return stream;
+ } else |err| switch (err) {
error.SystemResources,
error.OptionUnsupported,
error.ProcessFdQuotaExceeded,
@@ -237,66 +242,80 @@ pub fn connect(
error.WouldBlock => return error.Unexpected,
- else => |e| aggregate_error = e,
- },
- .end => |end| {
- saw_end = true;
- try end;
- return aggregate_error;
- },
+ else => |e| ip_connect_error = e,
+ }
} else |err| switch (err) {
error.Canceled => |e| return e,
+ error.Closed => {
+ // There was no successful connection attempt. If there was a lookup error, return that.
+ try connect_many.await(io);
+ // Otherwise, return the error from a failed IP connection attempt.
+ return ip_connect_error orelse
+ return error.UnknownHostName;
+ },
}
}
-pub const ConnectManyResult = union(enum) {
- connection: IpAddress.ConnectError!Stream,
- end: ConnectError!void,
-};
-
/// Asynchronously establishes a connection to all IP addresses associated with
/// a host name, adding them to a results queue upon completion.
+///
+/// Closes `results` before return, even on error.
+///
+/// Asserts `results` is not closed until this call returns.
pub fn connectMany(
host_name: HostName,
io: Io,
port: u16,
- results: *Io.Queue(ConnectManyResult),
+ results: *Io.Queue(IpAddress.ConnectError!Stream),
options: IpAddress.ConnectOptions,
-) void {
+) LookupError!void {
+ defer results.close(io);
+
var canonical_name_buffer: [max_len]u8 = undefined;
var lookup_buffer: [32]HostName.LookupResult = undefined;
var lookup_queue: Io.Queue(LookupResult) = .init(&lookup_buffer);
- var group: Io.Group = .init;
- defer group.cancel(io);
-
- group.async(io, lookup, .{ host_name, io, &lookup_queue, .{
+ var lookup_future = io.async(lookup, .{ host_name, io, &lookup_queue, .{
.port = port,
.canonical_name_buffer = &canonical_name_buffer,
} });
+ defer lookup_future.cancel(io) catch {};
+
+ var group: Io.Group = .init;
+ defer group.cancel(io);
while (lookup_queue.getOne(io)) |dns_result| switch (dns_result) {
.address => |address| group.async(io, enqueueConnection, .{ address, io, results, options }),
.canonical_name => continue,
- .end => |lookup_result| {
- group.wait(io);
- results.putOneUncancelable(io, .{ .end = lookup_result });
- return;
- },
} else |err| switch (err) {
- error.Canceled => |e| {
- group.cancel(io);
- results.putOneUncancelable(io, .{ .end = e });
+ error.Canceled => |e| return e,
+ error.Closed => {
+ group.wait(io);
+ return lookup_future.await(io);
},
}
}
-
fn enqueueConnection(
address: IpAddress,
io: Io,
- queue: *Io.Queue(ConnectManyResult),
+ queue: *Io.Queue(IpAddress.ConnectError!Stream),
options: IpAddress.ConnectOptions,
) void {
- queue.putOneUncancelable(io, .{ .connection = address.connect(io, options) });
+ enqueueConnectionFallible(address, io, queue, options) catch |err| switch (err) {
+ error.Canceled => {},
+ };
+}
+fn enqueueConnectionFallible(
+ address: IpAddress,
+ io: Io,
+ queue: *Io.Queue(IpAddress.ConnectError!Stream),
+ options: IpAddress.ConnectOptions,
+) Io.Cancelable!void {
+ const result = address.connect(io, options);
+ errdefer if (result) |s| s.close(io) else |_| {};
+ queue.putOne(io, result) catch |err| switch (err) {
+ error.Closed => unreachable, // `queue` must not be closed
+ error.Canceled => |e| return e,
+ };
}
pub const ResolvConf = struct {
diff --git a/lib/std/Io/net/test.zig b/lib/std/Io/net/test.zig
index 2a7a151b5d..e234a9edde 100644
--- a/lib/std/Io/net/test.zig
+++ b/lib/std/Io/net/test.zig
@@ -129,7 +129,7 @@ test "resolve DNS" {
var results_buffer: [32]net.HostName.LookupResult = undefined;
var results: Io.Queue(net.HostName.LookupResult) = .init(&results_buffer);
- net.HostName.lookup(try .init("localhost"), io, &results, .{
+ try net.HostName.lookup(try .init("localhost"), io, &results, .{
.port = 80,
.canonical_name_buffer = &canonical_name_buffer,
});
@@ -142,11 +142,10 @@ test "resolve DNS" {
addresses_found += 1;
},
.canonical_name => |canonical_name| try testing.expectEqualStrings("localhost", canonical_name.bytes),
- .end => |end| {
- try end;
- break;
- },
- } else |err| return err;
+ } else |err| switch (err) {
+ error.Closed => {},
+ error.Canceled => |e| return e,
+ }
try testing.expect(addresses_found != 0);
}
@@ -161,20 +160,19 @@ test "resolve DNS" {
net.HostName.lookup(try .init("example.com"), io, &results, .{
.port = 80,
.canonical_name_buffer = &canonical_name_buffer,
- });
+ }) catch |err| switch (err) {
+ error.UnknownHostName => return error.SkipZigTest,
+ error.NameServerFailure => return error.SkipZigTest,
+ else => |e| return e,
+ };
while (results.getOne(io)) |result| switch (result) {
.address => {},
.canonical_name => {},
- .end => |end| {
- end catch |err| switch (err) {
- error.UnknownHostName => return error.SkipZigTest,
- error.NameServerFailure => return error.SkipZigTest,
- else => return err,
- };
- break;
- },
- } else |err| return err;
+ } else |err| switch (err) {
+ error.Closed => {},
+ error.Canceled => |e| return e,
+ }
}
}
diff --git a/lib/std/Io/test.zig b/lib/std/Io/test.zig
index 94f280b358..f7965ed14e 100644
--- a/lib/std/Io/test.zig
+++ b/lib/std/Io/test.zig
@@ -209,10 +209,10 @@ test "select" {
return;
},
};
- defer if (get_a.cancel(io)) |_| {} else |_| @panic("fail");
+ defer _ = get_a.cancel(io) catch {};
var get_b = try io.concurrent(Io.Queue(u8).getOne, .{ &queue, io });
- defer if (get_b.cancel(io)) |_| {} else |_| @panic("fail");
+ defer _ = get_b.cancel(io) catch {};
var timeout = io.async(Io.sleep, .{ io, .fromMilliseconds(1), .awake });
defer timeout.cancel(io) catch {};
@@ -225,12 +225,9 @@ test "select" {
.get_a => return error.TestFailure,
.get_b => return error.TestFailure,
.timeout => {
- // Unblock the queues to avoid making this unit test depend on
- // cancellation.
- queue.putOneUncancelable(io, 1);
- queue.putOneUncancelable(io, 1);
- try testing.expectEqual(1, try get_a.await(io));
- try testing.expectEqual(1, try get_b.await(io));
+ queue.close(io);
+ try testing.expectError(error.Closed, get_a.await(io));
+ try testing.expectError(error.Closed, get_b.await(io));
},
}
}
@@ -256,6 +253,54 @@ test "Queue" {
try testQueue(5);
}
+test "Queue.close single-threaded" {
+ const io = std.testing.io;
+
+ var buf: [10]u8 = undefined;
+ var queue: Io.Queue(u8) = .init(&buf);
+
+ try queue.putAll(io, &.{ 0, 1, 2, 3, 4, 5, 6 });
+ try expectEqual(3, try queue.put(io, &.{ 7, 8, 9, 10 }, 0)); // there is capacity for 3 more items
+
+ var get_buf: [4]u8 = undefined;
+
+ // Receive some elements before closing
+ try expectEqual(4, try queue.get(io, &get_buf, 0));
+ try expectEqual(0, get_buf[0]);
+ try expectEqual(1, get_buf[1]);
+ try expectEqual(2, get_buf[2]);
+ try expectEqual(3, get_buf[3]);
+ try expectEqual(4, try queue.getOne(io));
+
+ // ...and add a couple more now there's space
+ try queue.putAll(io, &.{ 20, 21 });
+
+ queue.close(io);
+
+ // Receive more elements *after* closing
+ try expectEqual(4, try queue.get(io, &get_buf, 0));
+ try expectEqual(5, get_buf[0]);
+ try expectEqual(6, get_buf[1]);
+ try expectEqual(7, get_buf[2]);
+ try expectEqual(8, get_buf[3]);
+ try expectEqual(9, try queue.getOne(io));
+
+ // Cannot put anything while closed, even if the buffer has space
+ try expectError(error.Closed, queue.putOne(io, 100));
+ try expectError(error.Closed, queue.putAll(io, &.{ 101, 102 }));
+ try expectError(error.Closed, queue.putUncancelable(io, &.{ 103, 104 }, 0));
+
+ // Even if we ask for 3 items, the queue is closed, so we only get the last 2
+ try expectEqual(2, try queue.get(io, &get_buf, 4));
+ try expectEqual(20, get_buf[0]);
+ try expectEqual(21, get_buf[1]);
+
+ // The queue is now empty, so `get` should return `error.Closed` too
+ try expectError(error.Closed, queue.getOne(io));
+ try expectError(error.Closed, queue.get(io, &get_buf, 0));
+ try expectError(error.Closed, queue.putUncancelable(io, &get_buf, 2));
+}
+
test "Event" {
const global = struct {
fn waitAndRead(io: Io, event: *Io.Event, ptr: *const u32) Io.Cancelable!u32 {