aboutsummaryrefslogtreecommitdiff
path: root/src/ThreadSafeQueue.zig
blob: 74bbdc418f4326fb8e7992c9b1f39846f5715ba6 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
const std = @import("std");
const assert = std.debug.assert;
const Allocator = std.mem.Allocator;

pub fn ThreadSafeQueue(comptime T: type) type {
    return struct {
        worker_owned: std.ArrayListUnmanaged(T),
        /// Protected by `mutex`.
        shared: std.ArrayListUnmanaged(T),
        mutex: std.Thread.Mutex,
        state: State,

        const Self = @This();

        pub const State = enum { wait, run };

        pub const empty: Self = .{
            .worker_owned = .empty,
            .shared = .empty,
            .mutex = .{},
            .state = .wait,
        };

        pub fn deinit(self: *Self, gpa: Allocator) void {
            self.worker_owned.deinit(gpa);
            self.shared.deinit(gpa);
            self.* = undefined;
        }

        /// Must be called from the worker thread.
        pub fn check(self: *Self) ?[]T {
            assert(self.worker_owned.items.len == 0);
            {
                self.mutex.lock();
                defer self.mutex.unlock();
                assert(self.state == .run);
                if (self.shared.items.len == 0) {
                    self.state = .wait;
                    return null;
                }
                std.mem.swap(std.ArrayListUnmanaged(T), &self.worker_owned, &self.shared);
            }
            const result = self.worker_owned.items;
            self.worker_owned.clearRetainingCapacity();
            return result;
        }

        /// Adds items to the queue, returning true if and only if the worker
        /// thread is waiting. Thread-safe.
        /// Not safe to call from the worker thread.
        pub fn enqueue(self: *Self, gpa: Allocator, items: []const T) error{OutOfMemory}!bool {
            self.mutex.lock();
            defer self.mutex.unlock();
            try self.shared.appendSlice(gpa, items);
            return switch (self.state) {
                .run => false,
                .wait => {
                    self.state = .run;
                    return true;
                },
            };
        }

        /// Safe only to call exactly once when initially starting the worker.
        pub fn start(self: *Self) bool {
            assert(self.state == .wait);
            if (self.shared.items.len == 0) return false;
            self.state = .run;
            return true;
        }
    };
}