aboutsummaryrefslogtreecommitdiff
path: root/lib/std/event/batch.zig
diff options
context:
space:
mode:
authorAndrew Kelley <andrew@ziglang.org>2020-02-17 22:45:49 -0500
committerGitHub <noreply@github.com>2020-02-17 22:45:49 -0500
commite8a84927ab989d3c8fb6668dd242ada0a8f99585 (patch)
treee0189e9cae971e221f0b76fb4c7e7b096d23959a /lib/std/event/batch.zig
parent35f0cb049e6a46c8037bab15a9debc21ad1a979e (diff)
parent99520c4e6936b69e7489262bc35a70300366d395 (diff)
downloadzig-e8a84927ab989d3c8fb6668dd242ada0a8f99585.tar.gz
zig-e8a84927ab989d3c8fb6668dd242ada0a8f99585.zip
Merge pull request #4478 from ziglang/self-host-libc-detection
self-hosted libc and dynamic linker detection
Diffstat (limited to 'lib/std/event/batch.zig')
-rw-r--r--lib/std/event/batch.zig139
1 files changed, 139 insertions, 0 deletions
diff --git a/lib/std/event/batch.zig b/lib/std/event/batch.zig
new file mode 100644
index 0000000000..af59b32490
--- /dev/null
+++ b/lib/std/event/batch.zig
@@ -0,0 +1,139 @@
+const std = @import("../std.zig");
+const testing = std.testing;
+
+/// Performs multiple async functions in parallel, without heap allocation.
+/// Async function frames are managed externally to this abstraction, and
+/// passed in via the `add` function. Once all the jobs are added, call `wait`.
+/// This API is *not* thread-safe. The object must be accessed from one thread at
+/// a time, however, it need not be the same thread.
+pub fn Batch(
+ /// The return value for each job.
+ /// If a job slot was re-used due to maxed out concurrency, then its result
+ /// value will be overwritten. The values can be accessed with the `results` field.
+ comptime Result: type,
+ /// How many jobs to run in parallel.
+ comptime max_jobs: comptime_int,
+ /// Controls whether the `add` and `wait` functions will be async functions.
+ comptime async_behavior: enum {
+ /// Observe the value of `std.io.is_async` to decide whether `add`
+ /// and `wait` will be async functions. Asserts that the jobs do not suspend when
+ /// `std.io.mode == .blocking`. This is a generally safe assumption, and the
+ /// usual recommended option for this parameter.
+ auto_async,
+
+ /// Always uses the `noasync` keyword when using `await` on the jobs,
+ /// making `add` and `wait` non-async functions. Asserts that the jobs do not suspend.
+ never_async,
+
+ /// `add` and `wait` use regular `await` keyword, making them async functions.
+ always_async,
+ },
+) type {
+ return struct {
+ jobs: [max_jobs]Job,
+ next_job_index: usize,
+ collected_result: CollectedResult,
+
+ const Job = struct {
+ frame: ?anyframe->Result,
+ result: Result,
+ };
+
+ const Self = @This();
+
+ const CollectedResult = switch (@typeInfo(Result)) {
+ .ErrorUnion => Result,
+ else => void,
+ };
+
+ const async_ok = switch (async_behavior) {
+ .auto_async => std.io.is_async,
+ .never_async => false,
+ .always_async => true,
+ };
+
+ pub fn init() Self {
+ return Self{
+ .jobs = [1]Job{
+ .{
+ .frame = null,
+ .result = undefined,
+ },
+ } ** max_jobs,
+ .next_job_index = 0,
+ .collected_result = {},
+ };
+ }
+
+ /// Add a frame to the Batch. If all jobs are in-flight, then this function
+ /// waits until one completes.
+ /// This function is *not* thread-safe. It must be called from one thread at
+ /// a time, however, it need not be the same thread.
+ /// TODO: "select" language feature to use the next available slot, rather than
+ /// awaiting the next index.
+ pub fn add(self: *Self, frame: anyframe->Result) void {
+ const job = &self.jobs[self.next_job_index];
+ self.next_job_index = (self.next_job_index + 1) % max_jobs;
+ if (job.frame) |existing| {
+ job.result = if (async_ok) await existing else noasync await existing;
+ if (CollectedResult != void) {
+ job.result catch |err| {
+ self.collected_result = err;
+ };
+ }
+ }
+ job.frame = frame;
+ }
+
+ /// Wait for all the jobs to complete.
+ /// Safe to call any number of times.
+ /// If `Result` is an error union, this function returns the last error that occurred, if any.
+ /// Unlike the `results` field, the return value of `wait` will report any error that occurred;
+ /// hitting max parallelism will not compromise the result.
+ /// This function is *not* thread-safe. It must be called from one thread at
+ /// a time, however, it need not be the same thread.
+ pub fn wait(self: *Self) CollectedResult {
+ for (self.jobs) |*job| if (job.frame) |f| {
+ job.result = if (async_ok) await f else noasync await f;
+ if (CollectedResult != void) {
+ job.result catch |err| {
+ self.collected_result = err;
+ };
+ }
+ job.frame = null;
+ };
+ return self.collected_result;
+ }
+ };
+}
+
+test "std.event.Batch" {
+ var count: usize = 0;
+ var batch = Batch(void, 2, .auto_async).init();
+ batch.add(&async sleepALittle(&count));
+ batch.add(&async increaseByTen(&count));
+ batch.wait();
+ testing.expect(count == 11);
+
+ var another = Batch(anyerror!void, 2, .auto_async).init();
+ another.add(&async somethingElse());
+ another.add(&async doSomethingThatFails());
+ testing.expectError(error.ItBroke, another.wait());
+}
+
+fn sleepALittle(count: *usize) void {
+ std.time.sleep(1 * std.time.millisecond);
+ _ = @atomicRmw(usize, count, .Add, 1, .SeqCst);
+}
+
+fn increaseByTen(count: *usize) void {
+ var i: usize = 0;
+ while (i < 10) : (i += 1) {
+ _ = @atomicRmw(usize, count, .Add, 1, .SeqCst);
+ }
+}
+
+fn doSomethingThatFails() anyerror!void {}
+fn somethingElse() anyerror!void {
+ return error.ItBroke;
+}