diff options
| author | Andrew Kelley <andrew@ziglang.org> | 2024-02-09 13:38:42 -0800 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2024-02-09 13:38:42 -0800 |
| commit | 54bbc73f8502fe073d385361ddb34a43d12eec39 (patch) | |
| tree | 6eb554a4639f2c05bdfa5fc94553edc7f5df1650 /lib/std/event/batch.zig | |
| parent | d3cf911a803912a5aabe7a8d130c8b6468b95ff1 (diff) | |
| parent | 318e9cdaaae2f01c1a6e5db9cf66fe875821787e (diff) | |
| download | zig-54bbc73f8502fe073d385361ddb34a43d12eec39.tar.gz zig-54bbc73f8502fe073d385361ddb34a43d12eec39.zip | |
Merge pull request #18712 from Vexu/std.options
std: make options a struct instance instead of a namespace
Diffstat (limited to 'lib/std/event/batch.zig')
| -rw-r--r-- | lib/std/event/batch.zig | 141 |
1 files changed, 0 insertions, 141 deletions
diff --git a/lib/std/event/batch.zig b/lib/std/event/batch.zig deleted file mode 100644 index 9703a2512e..0000000000 --- a/lib/std/event/batch.zig +++ /dev/null @@ -1,141 +0,0 @@ -const std = @import("../std.zig"); -const testing = std.testing; - -/// Performs multiple async functions in parallel, without heap allocation. -/// Async function frames are managed externally to this abstraction, and -/// passed in via the `add` function. Once all the jobs are added, call `wait`. -/// This API is *not* thread-safe. The object must be accessed from one thread at -/// a time, however, it need not be the same thread. -pub fn Batch( - /// The return value for each job. - /// If a job slot was re-used due to maxed out concurrency, then its result - /// value will be overwritten. The values can be accessed with the `results` field. - comptime Result: type, - /// How many jobs to run in parallel. - comptime max_jobs: comptime_int, - /// Controls whether the `add` and `wait` functions will be async functions. - comptime async_behavior: enum { - /// Observe the value of `std.io.is_async` to decide whether `add` - /// and `wait` will be async functions. Asserts that the jobs do not suspend when - /// `std.options.io_mode == .blocking`. This is a generally safe assumption, and the - /// usual recommended option for this parameter. - auto_async, - - /// Always uses the `nosuspend` keyword when using `await` on the jobs, - /// making `add` and `wait` non-async functions. Asserts that the jobs do not suspend. - never_async, - - /// `add` and `wait` use regular `await` keyword, making them async functions. - always_async, - }, -) type { - return struct { - jobs: [max_jobs]Job, - next_job_index: usize, - collected_result: CollectedResult, - - const Job = struct { - frame: ?anyframe->Result, - result: Result, - }; - - const Self = @This(); - - const CollectedResult = switch (@typeInfo(Result)) { - .ErrorUnion => Result, - else => void, - }; - - const async_ok = switch (async_behavior) { - .auto_async => std.io.is_async, - .never_async => false, - .always_async => true, - }; - - pub fn init() Self { - return Self{ - .jobs = [1]Job{ - .{ - .frame = null, - .result = undefined, - }, - } ** max_jobs, - .next_job_index = 0, - .collected_result = {}, - }; - } - - /// Add a frame to the Batch. If all jobs are in-flight, then this function - /// waits until one completes. - /// This function is *not* thread-safe. It must be called from one thread at - /// a time, however, it need not be the same thread. - /// TODO: "select" language feature to use the next available slot, rather than - /// awaiting the next index. - pub fn add(self: *Self, frame: anyframe->Result) void { - const job = &self.jobs[self.next_job_index]; - self.next_job_index = (self.next_job_index + 1) % max_jobs; - if (job.frame) |existing| { - job.result = if (async_ok) await existing else nosuspend await existing; - if (CollectedResult != void) { - job.result catch |err| { - self.collected_result = err; - }; - } - } - job.frame = frame; - } - - /// Wait for all the jobs to complete. - /// Safe to call any number of times. - /// If `Result` is an error union, this function returns the last error that occurred, if any. - /// Unlike the `results` field, the return value of `wait` will report any error that occurred; - /// hitting max parallelism will not compromise the result. - /// This function is *not* thread-safe. It must be called from one thread at - /// a time, however, it need not be the same thread. - pub fn wait(self: *Self) CollectedResult { - for (self.jobs) |*job| - if (job.frame) |f| { - job.result = if (async_ok) await f else nosuspend await f; - if (CollectedResult != void) { - job.result catch |err| { - self.collected_result = err; - }; - } - job.frame = null; - }; - return self.collected_result; - } - }; -} - -test "std.event.Batch" { - if (true) return error.SkipZigTest; - var count: usize = 0; - var batch = Batch(void, 2, .auto_async).init(); - batch.add(&async sleepALittle(&count)); - batch.add(&async increaseByTen(&count)); - batch.wait(); - try testing.expect(count == 11); - - var another = Batch(anyerror!void, 2, .auto_async).init(); - another.add(&async somethingElse()); - another.add(&async doSomethingThatFails()); - try testing.expectError(error.ItBroke, another.wait()); -} - -fn sleepALittle(count: *usize) void { - std.time.sleep(1 * std.time.ns_per_ms); - _ = @atomicRmw(usize, count, .Add, 1, .SeqCst); -} - -fn increaseByTen(count: *usize) void { - var i: usize = 0; - while (i < 10) : (i += 1) { - _ = @atomicRmw(usize, count, .Add, 1, .SeqCst); - } -} - -fn doSomethingThatFails() anyerror!void {} -fn somethingElse() anyerror!void { - return error.ItBroke; -} |
