diff options
| author | Andrew Kelley <andrew@ziglang.org> | 2020-02-17 22:45:49 -0500 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2020-02-17 22:45:49 -0500 |
| commit | e8a84927ab989d3c8fb6668dd242ada0a8f99585 (patch) | |
| tree | e0189e9cae971e221f0b76fb4c7e7b096d23959a /lib/std/event/batch.zig | |
| parent | 35f0cb049e6a46c8037bab15a9debc21ad1a979e (diff) | |
| parent | 99520c4e6936b69e7489262bc35a70300366d395 (diff) | |
| download | zig-e8a84927ab989d3c8fb6668dd242ada0a8f99585.tar.gz zig-e8a84927ab989d3c8fb6668dd242ada0a8f99585.zip | |
Merge pull request #4478 from ziglang/self-host-libc-detection
self-hosted libc and dynamic linker detection
Diffstat (limited to 'lib/std/event/batch.zig')
| -rw-r--r-- | lib/std/event/batch.zig | 139 |
1 files changed, 139 insertions, 0 deletions
diff --git a/lib/std/event/batch.zig b/lib/std/event/batch.zig new file mode 100644 index 0000000000..af59b32490 --- /dev/null +++ b/lib/std/event/batch.zig @@ -0,0 +1,139 @@ +const std = @import("../std.zig"); +const testing = std.testing; + +/// Performs multiple async functions in parallel, without heap allocation. +/// Async function frames are managed externally to this abstraction, and +/// passed in via the `add` function. Once all the jobs are added, call `wait`. +/// This API is *not* thread-safe. The object must be accessed from one thread at +/// a time, however, it need not be the same thread. +pub fn Batch( + /// The return value for each job. + /// If a job slot was re-used due to maxed out concurrency, then its result + /// value will be overwritten. The values can be accessed with the `results` field. + comptime Result: type, + /// How many jobs to run in parallel. + comptime max_jobs: comptime_int, + /// Controls whether the `add` and `wait` functions will be async functions. + comptime async_behavior: enum { + /// Observe the value of `std.io.is_async` to decide whether `add` + /// and `wait` will be async functions. Asserts that the jobs do not suspend when + /// `std.io.mode == .blocking`. This is a generally safe assumption, and the + /// usual recommended option for this parameter. + auto_async, + + /// Always uses the `noasync` keyword when using `await` on the jobs, + /// making `add` and `wait` non-async functions. Asserts that the jobs do not suspend. + never_async, + + /// `add` and `wait` use regular `await` keyword, making them async functions. + always_async, + }, +) type { + return struct { + jobs: [max_jobs]Job, + next_job_index: usize, + collected_result: CollectedResult, + + const Job = struct { + frame: ?anyframe->Result, + result: Result, + }; + + const Self = @This(); + + const CollectedResult = switch (@typeInfo(Result)) { + .ErrorUnion => Result, + else => void, + }; + + const async_ok = switch (async_behavior) { + .auto_async => std.io.is_async, + .never_async => false, + .always_async => true, + }; + + pub fn init() Self { + return Self{ + .jobs = [1]Job{ + .{ + .frame = null, + .result = undefined, + }, + } ** max_jobs, + .next_job_index = 0, + .collected_result = {}, + }; + } + + /// Add a frame to the Batch. If all jobs are in-flight, then this function + /// waits until one completes. + /// This function is *not* thread-safe. It must be called from one thread at + /// a time, however, it need not be the same thread. + /// TODO: "select" language feature to use the next available slot, rather than + /// awaiting the next index. + pub fn add(self: *Self, frame: anyframe->Result) void { + const job = &self.jobs[self.next_job_index]; + self.next_job_index = (self.next_job_index + 1) % max_jobs; + if (job.frame) |existing| { + job.result = if (async_ok) await existing else noasync await existing; + if (CollectedResult != void) { + job.result catch |err| { + self.collected_result = err; + }; + } + } + job.frame = frame; + } + + /// Wait for all the jobs to complete. + /// Safe to call any number of times. + /// If `Result` is an error union, this function returns the last error that occurred, if any. + /// Unlike the `results` field, the return value of `wait` will report any error that occurred; + /// hitting max parallelism will not compromise the result. + /// This function is *not* thread-safe. It must be called from one thread at + /// a time, however, it need not be the same thread. + pub fn wait(self: *Self) CollectedResult { + for (self.jobs) |*job| if (job.frame) |f| { + job.result = if (async_ok) await f else noasync await f; + if (CollectedResult != void) { + job.result catch |err| { + self.collected_result = err; + }; + } + job.frame = null; + }; + return self.collected_result; + } + }; +} + +test "std.event.Batch" { + var count: usize = 0; + var batch = Batch(void, 2, .auto_async).init(); + batch.add(&async sleepALittle(&count)); + batch.add(&async increaseByTen(&count)); + batch.wait(); + testing.expect(count == 11); + + var another = Batch(anyerror!void, 2, .auto_async).init(); + another.add(&async somethingElse()); + another.add(&async doSomethingThatFails()); + testing.expectError(error.ItBroke, another.wait()); +} + +fn sleepALittle(count: *usize) void { + std.time.sleep(1 * std.time.millisecond); + _ = @atomicRmw(usize, count, .Add, 1, .SeqCst); +} + +fn increaseByTen(count: *usize) void { + var i: usize = 0; + while (i < 10) : (i += 1) { + _ = @atomicRmw(usize, count, .Add, 1, .SeqCst); + } +} + +fn doSomethingThatFails() anyerror!void {} +fn somethingElse() anyerror!void { + return error.ItBroke; +} |
