diff options
| author | Igor Anić <igor.anic@gmail.com> | 2024-02-22 15:18:03 +0100 |
|---|---|---|
| committer | Igor Anić <igor.anic@gmail.com> | 2024-02-22 15:18:03 +0100 |
| commit | d00faa2407cdeaa058da62f2d95f64f9e7ed6a09 (patch) | |
| tree | 4d15f066681c11af7effe028454ea83bc7beb58a /src/Package/Fetch/git.zig | |
| parent | eb67fab2d9e8540b9052e4b0d3cd0149da9d9962 (diff) | |
| download | zig-d00faa2407cdeaa058da62f2d95f64f9e7ed6a09.tar.gz zig-d00faa2407cdeaa058da62f2d95f64f9e7ed6a09.zip | |
use BufferedTee in Fetch/git.zig
Diffstat (limited to 'src/Package/Fetch/git.zig')
| -rw-r--r-- | src/Package/Fetch/git.zig | 172 |
1 files changed, 26 insertions, 146 deletions
diff --git a/src/Package/Fetch/git.zig b/src/Package/Fetch/git.zig index db50ddfab7..dd6f63d177 100644 --- a/src/Package/Fetch/git.zig +++ b/src/Package/Fetch/git.zig @@ -1091,87 +1091,7 @@ pub fn indexPack(allocator: Allocator, pack: std.fs.File, index_writer: anytype) try index_writer.writeAll(&index_checksum); } -/// A reader that stores read data in a growable internal buffer. The read -/// position can be rewound to allow previously read data to be read again. -fn AccumulatingReader(comptime ReaderType: type) type { - return struct { - child_reader: ReaderType, - buffer: std.ArrayListUnmanaged(u8) = .{}, - /// The position in `buffer` from which reads should start, returning - /// buffered data. If this is `buffer.items.len`, data will be read from - /// `child_reader` instead. - read_start: usize = 0, - allocator: Allocator, - - const Self = @This(); - - fn deinit(self: *Self) void { - self.buffer.deinit(self.allocator); - self.* = undefined; - } - - const ReadError = ReaderType.Error || Allocator.Error; - const Reader = std.io.Reader(*Self, ReadError, read); - - fn read(self: *Self, buf: []u8) ReadError!usize { - if (self.read_start < self.buffer.items.len) { - // Previously buffered data is available and should be used - // before reading more data from the underlying reader. - const available = self.buffer.items.len - self.read_start; - const count = @min(available, buf.len); - @memcpy(buf[0..count], self.buffer.items[self.read_start..][0..count]); - self.read_start += count; - return count; - } - - try self.buffer.ensureUnusedCapacity(self.allocator, buf.len); - const read_buffer = self.buffer.unusedCapacitySlice(); - const count = try self.child_reader.read(read_buffer[0..buf.len]); - @memcpy(buf[0..count], read_buffer[0..count]); - self.buffer.items.len += count; - self.read_start += count; - return count; - } - - fn reader(self: *Self) Reader { - return .{ .context = self }; - } - - /// Returns a slice of the buffered data that has already been read, - /// except the last `count_before_end` bytes. - fn readDataExcept(self: Self, count_before_end: usize) []const u8 { - assert(count_before_end <= self.read_start); - return self.buffer.items[0 .. self.read_start - count_before_end]; - } - - /// Discards the first `count` bytes of buffered data. - fn discard(self: *Self, count: usize) void { - assert(count <= self.buffer.items.len); - const retain = self.buffer.items.len - count; - mem.copyForwards( - u8, - self.buffer.items[0..retain], - self.buffer.items[count..][0..retain], - ); - self.buffer.items.len = retain; - self.read_start -= @min(self.read_start, count); - } - - /// Rewinds the read position to the beginning of buffered data. - fn rewind(self: *Self) void { - self.read_start = 0; - } - }; -} - -fn accumulatingReader( - allocator: Allocator, - reader: anytype, -) AccumulatingReader(@TypeOf(reader)) { - return .{ .child_reader = reader, .allocator = allocator }; -} - -/// Performs the first pass over the packfile data for index construction. +// Performs the first pass over the packfile data for index construction. /// This will index all non-delta objects, queue delta objects for further /// processing, and return the pack checksum (which is part of the index /// format). @@ -1181,102 +1101,62 @@ fn indexPackFirstPass( index_entries: *std.AutoHashMapUnmanaged(Oid, IndexEntry), pending_deltas: *std.ArrayListUnmanaged(IndexEntry), ) ![Sha1.digest_length]u8 { - var pack_buffered_reader = std.io.bufferedReader(pack.reader()); - var pack_accumulating_reader = accumulatingReader(allocator, pack_buffered_reader.reader()); - defer pack_accumulating_reader.deinit(); - var pack_position: usize = 0; - var pack_hash = Sha1.init(.{}); - const pack_reader = pack_accumulating_reader.reader(); + var pack_counting_writer = std.io.countingWriter(std.io.null_writer); + var pack_hashed_writer = std.compress.hashedWriter(pack_counting_writer.writer(), Sha1.init(.{})); + var entry_crc32_writer = std.compress.hashedWriter(pack_hashed_writer.writer(), std.hash.Crc32.init()); + var pack_buffered_reader = std.io.bufferedTee(4096, 8, pack.reader(), entry_crc32_writer.writer()); + const pack_reader = pack_buffered_reader.reader(); const pack_header = try PackHeader.read(pack_reader); - const pack_header_bytes = pack_accumulating_reader.readDataExcept(0); - pack_position += pack_header_bytes.len; - pack_hash.update(pack_header_bytes); - pack_accumulating_reader.discard(pack_header_bytes.len); + try pack_buffered_reader.flush(); var current_entry: u32 = 0; while (current_entry < pack_header.total_objects) : (current_entry += 1) { - const entry_offset = pack_position; - var entry_crc32 = std.hash.Crc32.init(); - + const entry_offset = pack_counting_writer.bytes_written; + entry_crc32_writer.hasher = std.hash.Crc32.init(); // reset hasher const entry_header = try EntryHeader.read(pack_reader); - const entry_header_bytes = pack_accumulating_reader.readDataExcept(0); - pack_position += entry_header_bytes.len; - pack_hash.update(entry_header_bytes); - entry_crc32.update(entry_header_bytes); - pack_accumulating_reader.discard(entry_header_bytes.len); switch (entry_header) { - .commit, .tree, .blob, .tag => |object| { + inline .commit, .tree, .blob, .tag => |object, tag| { var entry_decompress_stream = std.compress.zlib.decompressor(pack_reader); - var entry_data_size: usize = 0; + var entry_counting_reader = std.io.countingReader(entry_decompress_stream.reader()); var entry_hashed_writer = hashedWriter(std.io.null_writer, Sha1.init(.{})); const entry_writer = entry_hashed_writer.writer(); - // The object header is not included in the pack data but is // part of the object's ID - try entry_writer.print("{s} {}\x00", .{ @tagName(entry_header), object.uncompressed_length }); - - while (try entry_decompress_stream.next()) |decompressed_data| { - entry_data_size += decompressed_data.len; - try entry_writer.writeAll(decompressed_data); - - const compressed_bytes = pack_accumulating_reader.readDataExcept(entry_decompress_stream.unreadBytes()); - pack_position += compressed_bytes.len; - pack_hash.update(compressed_bytes); - entry_crc32.update(compressed_bytes); - pack_accumulating_reader.discard(compressed_bytes.len); - } - const footer_bytes = pack_accumulating_reader.readDataExcept(entry_decompress_stream.unreadBytes()); - pack_position += footer_bytes.len; - pack_hash.update(footer_bytes); - entry_crc32.update(footer_bytes); - pack_accumulating_reader.discard(footer_bytes.len); - pack_accumulating_reader.rewind(); - - if (entry_data_size != object.uncompressed_length) { + try entry_writer.print("{s} {}\x00", .{ @tagName(tag), object.uncompressed_length }); + var fifo = std.fifo.LinearFifo(u8, .{ .Static = 4096 }).init(); + try fifo.pump(entry_counting_reader.reader(), entry_writer); + if (entry_counting_reader.bytes_read != object.uncompressed_length) { return error.InvalidObject; } - const oid = entry_hashed_writer.hasher.finalResult(); + pack_buffered_reader.putBack(entry_decompress_stream.unreadBytes()); + try pack_buffered_reader.flush(); try index_entries.put(allocator, oid, .{ .offset = entry_offset, - .crc32 = entry_crc32.final(), + .crc32 = entry_crc32_writer.hasher.final(), }); }, inline .ofs_delta, .ref_delta => |delta| { var entry_decompress_stream = std.compress.zlib.decompressor(pack_reader); - var entry_data_size: usize = 0; - - while (try entry_decompress_stream.next()) |decompressed_data| { - entry_data_size += decompressed_data.len; - - const compressed_bytes = pack_accumulating_reader.readDataExcept(entry_decompress_stream.unreadBytes()); - pack_position += compressed_bytes.len; - pack_hash.update(compressed_bytes); - entry_crc32.update(compressed_bytes); - pack_accumulating_reader.discard(compressed_bytes.len); - } - const footer_bytes = pack_accumulating_reader.readDataExcept(entry_decompress_stream.unreadBytes()); - pack_position += footer_bytes.len; - pack_hash.update(footer_bytes); - entry_crc32.update(footer_bytes); - pack_accumulating_reader.discard(footer_bytes.len); - pack_accumulating_reader.rewind(); - - if (entry_data_size != delta.uncompressed_length) { + var entry_counting_reader = std.io.countingReader(entry_decompress_stream.reader()); + var fifo = std.fifo.LinearFifo(u8, .{ .Static = 4096 }).init(); + try fifo.pump(entry_counting_reader.reader(), std.io.null_writer); + if (entry_counting_reader.bytes_read != delta.uncompressed_length) { return error.InvalidObject; } - + pack_buffered_reader.putBack(entry_decompress_stream.unreadBytes()); + try pack_buffered_reader.flush(); try pending_deltas.append(allocator, .{ .offset = entry_offset, - .crc32 = entry_crc32.final(), + .crc32 = entry_crc32_writer.hasher.final(), }); }, } } - const pack_checksum = pack_hash.finalResult(); + const pack_checksum = pack_hashed_writer.hasher.finalResult(); const recorded_checksum = try pack_reader.readBytesNoEof(Sha1.digest_length); if (!mem.eql(u8, &pack_checksum, &recorded_checksum)) { return error.CorruptedPack; |
