aboutsummaryrefslogtreecommitdiff
path: root/src/Package/Fetch/git.zig
diff options
context:
space:
mode:
authorIan Johnson <ian@ianjohnson.dev>2024-02-18 16:39:03 -0500
committerAndrew Kelley <andrew@ziglang.org>2024-02-19 13:43:32 -0800
commit80f3ef6e14a1213d1c3b31d515870afb43cc9379 (patch)
treebaeb9d69b66b63cc8d08588e9ab94b12ebc68534 /src/Package/Fetch/git.zig
parent5c25ad0fda23eca4b98aad48d709b3b1b9caf2d6 (diff)
downloadzig-80f3ef6e14a1213d1c3b31d515870afb43cc9379.tar.gz
zig-80f3ef6e14a1213d1c3b31d515870afb43cc9379.zip
Package.Fetch: fix Git package fetching
This commit works around #18967 by adding an `AccumulatingReader`, which accumulates data read from the underlying packfile, and by keeping track of the position in the packfile and hash/checksum information separately rather than using reader composition. That is, the packfile position and hashes/checksums are updated with the accumulated read history data only after we can determine what data has actually been used by the decompressor rather than merely being buffered. The only addition to the standard library APIs to support this change is the `unreadBytes` function in `std.compress.flate.Inflate`, which allows the user to determine how many bytes have been read only for buffering and not used as part of compressed data. These changes can be reverted if #18967 is resolved with a decompressor that reads precisely only the number of bytes needed for decompression.
Diffstat (limited to 'src/Package/Fetch/git.zig')
-rw-r--r--src/Package/Fetch/git.zig179
1 files changed, 155 insertions, 24 deletions
diff --git a/src/Package/Fetch/git.zig b/src/Package/Fetch/git.zig
index abbb031948..db50ddfab7 100644
--- a/src/Package/Fetch/git.zig
+++ b/src/Package/Fetch/git.zig
@@ -1091,6 +1091,86 @@ pub fn indexPack(allocator: Allocator, pack: std.fs.File, index_writer: anytype)
try index_writer.writeAll(&index_checksum);
}
+/// A reader that stores read data in a growable internal buffer. The read
+/// position can be rewound to allow previously read data to be read again.
+fn AccumulatingReader(comptime ReaderType: type) type {
+ return struct {
+ child_reader: ReaderType,
+ buffer: std.ArrayListUnmanaged(u8) = .{},
+ /// The position in `buffer` from which reads should start, returning
+ /// buffered data. If this is `buffer.items.len`, data will be read from
+ /// `child_reader` instead.
+ read_start: usize = 0,
+ allocator: Allocator,
+
+ const Self = @This();
+
+ fn deinit(self: *Self) void {
+ self.buffer.deinit(self.allocator);
+ self.* = undefined;
+ }
+
+ const ReadError = ReaderType.Error || Allocator.Error;
+ const Reader = std.io.Reader(*Self, ReadError, read);
+
+ fn read(self: *Self, buf: []u8) ReadError!usize {
+ if (self.read_start < self.buffer.items.len) {
+ // Previously buffered data is available and should be used
+ // before reading more data from the underlying reader.
+ const available = self.buffer.items.len - self.read_start;
+ const count = @min(available, buf.len);
+ @memcpy(buf[0..count], self.buffer.items[self.read_start..][0..count]);
+ self.read_start += count;
+ return count;
+ }
+
+ try self.buffer.ensureUnusedCapacity(self.allocator, buf.len);
+ const read_buffer = self.buffer.unusedCapacitySlice();
+ const count = try self.child_reader.read(read_buffer[0..buf.len]);
+ @memcpy(buf[0..count], read_buffer[0..count]);
+ self.buffer.items.len += count;
+ self.read_start += count;
+ return count;
+ }
+
+ fn reader(self: *Self) Reader {
+ return .{ .context = self };
+ }
+
+ /// Returns a slice of the buffered data that has already been read,
+ /// except the last `count_before_end` bytes.
+ fn readDataExcept(self: Self, count_before_end: usize) []const u8 {
+ assert(count_before_end <= self.read_start);
+ return self.buffer.items[0 .. self.read_start - count_before_end];
+ }
+
+ /// Discards the first `count` bytes of buffered data.
+ fn discard(self: *Self, count: usize) void {
+ assert(count <= self.buffer.items.len);
+ const retain = self.buffer.items.len - count;
+ mem.copyForwards(
+ u8,
+ self.buffer.items[0..retain],
+ self.buffer.items[count..][0..retain],
+ );
+ self.buffer.items.len = retain;
+ self.read_start -= @min(self.read_start, count);
+ }
+
+ /// Rewinds the read position to the beginning of buffered data.
+ fn rewind(self: *Self) void {
+ self.read_start = 0;
+ }
+ };
+}
+
+fn accumulatingReader(
+ allocator: Allocator,
+ reader: anytype,
+) AccumulatingReader(@TypeOf(reader)) {
+ return .{ .child_reader = reader, .allocator = allocator };
+}
+
/// Performs the first pass over the packfile data for index construction.
/// This will index all non-delta objects, queue delta objects for further
/// processing, and return the pack checksum (which is part of the index
@@ -1102,59 +1182,106 @@ fn indexPackFirstPass(
pending_deltas: *std.ArrayListUnmanaged(IndexEntry),
) ![Sha1.digest_length]u8 {
var pack_buffered_reader = std.io.bufferedReader(pack.reader());
- var pack_counting_reader = std.io.countingReader(pack_buffered_reader.reader());
- var pack_hashed_reader = std.compress.hashedReader(pack_counting_reader.reader(), Sha1.init(.{}));
- const pack_reader = pack_hashed_reader.reader();
+ var pack_accumulating_reader = accumulatingReader(allocator, pack_buffered_reader.reader());
+ defer pack_accumulating_reader.deinit();
+ var pack_position: usize = 0;
+ var pack_hash = Sha1.init(.{});
+ const pack_reader = pack_accumulating_reader.reader();
const pack_header = try PackHeader.read(pack_reader);
+ const pack_header_bytes = pack_accumulating_reader.readDataExcept(0);
+ pack_position += pack_header_bytes.len;
+ pack_hash.update(pack_header_bytes);
+ pack_accumulating_reader.discard(pack_header_bytes.len);
var current_entry: u32 = 0;
while (current_entry < pack_header.total_objects) : (current_entry += 1) {
- const entry_offset = pack_counting_reader.bytes_read;
- var entry_crc32_reader = std.compress.hashedReader(pack_reader, std.hash.Crc32.init());
- const entry_header = try EntryHeader.read(entry_crc32_reader.reader());
+ const entry_offset = pack_position;
+ var entry_crc32 = std.hash.Crc32.init();
+
+ const entry_header = try EntryHeader.read(pack_reader);
+ const entry_header_bytes = pack_accumulating_reader.readDataExcept(0);
+ pack_position += entry_header_bytes.len;
+ pack_hash.update(entry_header_bytes);
+ entry_crc32.update(entry_header_bytes);
+ pack_accumulating_reader.discard(entry_header_bytes.len);
+
switch (entry_header) {
- inline .commit, .tree, .blob, .tag => |object, tag| {
- var entry_decompress_stream = std.compress.zlib.decompressor(entry_crc32_reader.reader());
- var entry_counting_reader = std.io.countingReader(entry_decompress_stream.reader());
+ .commit, .tree, .blob, .tag => |object| {
+ var entry_decompress_stream = std.compress.zlib.decompressor(pack_reader);
+ var entry_data_size: usize = 0;
var entry_hashed_writer = hashedWriter(std.io.null_writer, Sha1.init(.{}));
const entry_writer = entry_hashed_writer.writer();
+
// The object header is not included in the pack data but is
// part of the object's ID
- try entry_writer.print("{s} {}\x00", .{ @tagName(tag), object.uncompressed_length });
- var fifo = std.fifo.LinearFifo(u8, .{ .Static = 4096 }).init();
- try fifo.pump(entry_counting_reader.reader(), entry_writer);
- if (entry_counting_reader.bytes_read != object.uncompressed_length) {
+ try entry_writer.print("{s} {}\x00", .{ @tagName(entry_header), object.uncompressed_length });
+
+ while (try entry_decompress_stream.next()) |decompressed_data| {
+ entry_data_size += decompressed_data.len;
+ try entry_writer.writeAll(decompressed_data);
+
+ const compressed_bytes = pack_accumulating_reader.readDataExcept(entry_decompress_stream.unreadBytes());
+ pack_position += compressed_bytes.len;
+ pack_hash.update(compressed_bytes);
+ entry_crc32.update(compressed_bytes);
+ pack_accumulating_reader.discard(compressed_bytes.len);
+ }
+ const footer_bytes = pack_accumulating_reader.readDataExcept(entry_decompress_stream.unreadBytes());
+ pack_position += footer_bytes.len;
+ pack_hash.update(footer_bytes);
+ entry_crc32.update(footer_bytes);
+ pack_accumulating_reader.discard(footer_bytes.len);
+ pack_accumulating_reader.rewind();
+
+ if (entry_data_size != object.uncompressed_length) {
return error.InvalidObject;
}
+
const oid = entry_hashed_writer.hasher.finalResult();
try index_entries.put(allocator, oid, .{
.offset = entry_offset,
- .crc32 = entry_crc32_reader.hasher.final(),
+ .crc32 = entry_crc32.final(),
});
},
inline .ofs_delta, .ref_delta => |delta| {
- var entry_decompress_stream = std.compress.zlib.decompressor(entry_crc32_reader.reader());
- var entry_counting_reader = std.io.countingReader(entry_decompress_stream.reader());
- var fifo = std.fifo.LinearFifo(u8, .{ .Static = 4096 }).init();
- try fifo.pump(entry_counting_reader.reader(), std.io.null_writer);
- if (entry_counting_reader.bytes_read != delta.uncompressed_length) {
+ var entry_decompress_stream = std.compress.zlib.decompressor(pack_reader);
+ var entry_data_size: usize = 0;
+
+ while (try entry_decompress_stream.next()) |decompressed_data| {
+ entry_data_size += decompressed_data.len;
+
+ const compressed_bytes = pack_accumulating_reader.readDataExcept(entry_decompress_stream.unreadBytes());
+ pack_position += compressed_bytes.len;
+ pack_hash.update(compressed_bytes);
+ entry_crc32.update(compressed_bytes);
+ pack_accumulating_reader.discard(compressed_bytes.len);
+ }
+ const footer_bytes = pack_accumulating_reader.readDataExcept(entry_decompress_stream.unreadBytes());
+ pack_position += footer_bytes.len;
+ pack_hash.update(footer_bytes);
+ entry_crc32.update(footer_bytes);
+ pack_accumulating_reader.discard(footer_bytes.len);
+ pack_accumulating_reader.rewind();
+
+ if (entry_data_size != delta.uncompressed_length) {
return error.InvalidObject;
}
+
try pending_deltas.append(allocator, .{
.offset = entry_offset,
- .crc32 = entry_crc32_reader.hasher.final(),
+ .crc32 = entry_crc32.final(),
});
},
}
}
- const pack_checksum = pack_hashed_reader.hasher.finalResult();
- const recorded_checksum = try pack_buffered_reader.reader().readBytesNoEof(Sha1.digest_length);
+ const pack_checksum = pack_hash.finalResult();
+ const recorded_checksum = try pack_reader.readBytesNoEof(Sha1.digest_length);
if (!mem.eql(u8, &pack_checksum, &recorded_checksum)) {
return error.CorruptedPack;
}
- _ = pack_buffered_reader.reader().readByte() catch |e| switch (e) {
+ _ = pack_reader.readByte() catch |e| switch (e) {
error.EndOfStream => return pack_checksum,
else => |other| return other,
};
@@ -1385,7 +1512,11 @@ test "packfile indexing and checkout" {
defer worktree.cleanup();
const commit_id = try parseOid("dd582c0720819ab7130b103635bd7271b9fd4feb");
- try repository.checkout(worktree.dir, commit_id);
+
+ var diagnostics: Diagnostics = .{ .allocator = testing.allocator };
+ defer diagnostics.deinit();
+ try repository.checkout(worktree.dir, commit_id, &diagnostics);
+ try testing.expect(diagnostics.errors.items.len == 0);
const expected_files: []const []const u8 = &.{
"dir/file",