From 6e07f9477c863d4024beafccada96cbe389ee9cc Mon Sep 17 00:00:00 2001 From: Dylan Conway <35280289+dylan-conway@users.noreply.github.com> Date: Mon, 1 Apr 2024 05:23:47 -0700 Subject: [PATCH] fix(streams): don't lose bytes on drain (#9768) * fix * clear * update * test * fix test --------- Co-authored-by: Jarred Sumner --- src/bun.js/webcore/streams.zig | 15 ++++++++++++++- src/io/PipeReader.zig | 2 +- src/js/builtins/ReadableStreamInternals.ts | 2 +- test/js/bun/spawn/spawn.test.ts | 14 ++++++++++++++ 4 files changed, 30 insertions(+), 3 deletions(-) diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig index 7403ec591ab75..4d181b0c4fde7 100644 --- a/src/bun.js/webcore/streams.zig +++ b/src/bun.js/webcore/streams.zig @@ -3749,6 +3749,9 @@ pub const FileReader = struct { if (!bun.isSliceInBuffer(buf, this.buffered.allocatedSlice())) { if (this.reader.isDone()) { + if (bun.isSliceInBuffer(buf, this.reader.buffer().allocatedSlice())) { + this.reader.buffer().* = std.ArrayList(u8).init(bun.default_allocator); + } this.pending.result = .{ .temporary_and_done = bun.ByteList.init(buf), }; @@ -3756,6 +3759,10 @@ pub const FileReader = struct { this.pending.result = .{ .temporary = bun.ByteList.init(buf), }; + + if (bun.isSliceInBuffer(buf, this.reader.buffer().allocatedSlice())) { + this.reader.buffer().clearRetainingCapacity(); + } } this.pending_value.clear(); @@ -3780,6 +3787,9 @@ pub const FileReader = struct { return !was_done; } else if (!bun.isSliceInBuffer(buf, this.buffered.allocatedSlice())) { this.buffered.appendSlice(bun.default_allocator, buf) catch bun.outOfMemory(); + if (bun.isSliceInBuffer(buf, this.reader.buffer().allocatedSlice())) { + this.reader.buffer().clearRetainingCapacity(); + } } // For pipes, we have to keep pulling or the other process will block. @@ -3886,6 +3896,9 @@ pub const FileReader = struct { if (this.buffered.items.len > 0) { const out = bun.ByteList.init(this.buffered.items); this.buffered = .{}; + if (comptime Environment.allow_assert) { + std.debug.assert(this.reader.buffer().items.ptr != out.ptr); + } return out; } @@ -3893,7 +3906,7 @@ pub const FileReader = struct { return .{}; } - const out = this.reader.buffer(); + const out = this.reader.buffer().*; this.reader.buffer().* = std.ArrayList(u8).init(bun.default_allocator); return bun.ByteList.fromList(out); } diff --git a/src/io/PipeReader.zig b/src/io/PipeReader.zig index 5fd76d7725136..8192033d5547b 100644 --- a/src/io/PipeReader.zig +++ b/src/io/PipeReader.zig @@ -339,7 +339,7 @@ pub fn WindowsPipeReader( var this = bun.cast(*This, stream.data); const nread_int = nread.int(); - bun.sys.syslog("onStreamRead() = {d}", .{nread_int}); + bun.sys.syslog("onStreamRead(0x{d}) = {d}", .{ @intFromPtr(this), nread_int }); // NOTE: pipes/tty need to call stopReading on errors (yeah) switch (nread_int) { diff --git a/src/js/builtins/ReadableStreamInternals.ts b/src/js/builtins/ReadableStreamInternals.ts index 5b81edd9b3221..f693fe8f35e03 100644 --- a/src/js/builtins/ReadableStreamInternals.ts +++ b/src/js/builtins/ReadableStreamInternals.ts @@ -1656,7 +1656,7 @@ export function lazyLoadStream(stream, autoAllocateChunkSize) { $assert(controller, "controller is missing"); if (result && $isPromise(result)) { - return result.then( + return result.$then( handleNativeReadableStreamPromiseResult.bind({ c: controller, v: view, diff --git a/test/js/bun/spawn/spawn.test.ts b/test/js/bun/spawn/spawn.test.ts index 81e8a31f03f92..aaca1957facfd 100644 --- a/test/js/bun/spawn/spawn.test.ts +++ b/test/js/bun/spawn/spawn.test.ts @@ -490,6 +490,20 @@ for (let [gcTick, label] of [ }); }); } + + it("should allow reading stdout after a few milliseconds", async () => { + for (let i = 0; i < 50; i++) { + const proc = Bun.spawn({ + cmd: ["git", "--version"], + stdout: "pipe", + stderr: "ignore", + stdin: "ignore", + }); + await Bun.sleep(1); + const out = await Bun.readableStreamToText(proc.stdout); + expect(out).not.toBe(""); + } + }); }); it("throws errors for invalid arguments", async () => {