Skip to content

Commit

Permalink
fix(streams): don't lose bytes on drain (#9768)
Browse files Browse the repository at this point in the history
* fix

* clear

* update

* test

* fix test

---------

Co-authored-by: Jarred Sumner <jarred@jarredsumner.com>
  • Loading branch information
dylan-conway and Jarred-Sumner authored Apr 1, 2024
1 parent 2dd2fc6 commit 6e07f94
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 3 deletions.
15 changes: 14 additions & 1 deletion src/bun.js/webcore/streams.zig
Original file line number Diff line number Diff line change
Expand Up @@ -3749,13 +3749,20 @@ pub const FileReader = struct {

if (!bun.isSliceInBuffer(buf, this.buffered.allocatedSlice())) {
if (this.reader.isDone()) {
if (bun.isSliceInBuffer(buf, this.reader.buffer().allocatedSlice())) {
this.reader.buffer().* = std.ArrayList(u8).init(bun.default_allocator);
}
this.pending.result = .{
.temporary_and_done = bun.ByteList.init(buf),
};
} else {
this.pending.result = .{
.temporary = bun.ByteList.init(buf),
};

if (bun.isSliceInBuffer(buf, this.reader.buffer().allocatedSlice())) {
this.reader.buffer().clearRetainingCapacity();
}
}

this.pending_value.clear();
Expand All @@ -3780,6 +3787,9 @@ pub const FileReader = struct {
return !was_done;
} else if (!bun.isSliceInBuffer(buf, this.buffered.allocatedSlice())) {
this.buffered.appendSlice(bun.default_allocator, buf) catch bun.outOfMemory();
if (bun.isSliceInBuffer(buf, this.reader.buffer().allocatedSlice())) {
this.reader.buffer().clearRetainingCapacity();
}
}

// For pipes, we have to keep pulling or the other process will block.
Expand Down Expand Up @@ -3886,14 +3896,17 @@ pub const FileReader = struct {
if (this.buffered.items.len > 0) {
const out = bun.ByteList.init(this.buffered.items);
this.buffered = .{};
if (comptime Environment.allow_assert) {
std.debug.assert(this.reader.buffer().items.ptr != out.ptr);
}
return out;
}

if (this.reader.hasPendingRead()) {
return .{};
}

const out = this.reader.buffer();
const out = this.reader.buffer().*;
this.reader.buffer().* = std.ArrayList(u8).init(bun.default_allocator);
return bun.ByteList.fromList(out);
}
Expand Down
2 changes: 1 addition & 1 deletion src/io/PipeReader.zig
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ pub fn WindowsPipeReader(
var this = bun.cast(*This, stream.data);

const nread_int = nread.int();
bun.sys.syslog("onStreamRead() = {d}", .{nread_int});
bun.sys.syslog("onStreamRead(0x{d}) = {d}", .{ @intFromPtr(this), nread_int });

// NOTE: pipes/tty need to call stopReading on errors (yeah)
switch (nread_int) {
Expand Down
2 changes: 1 addition & 1 deletion src/js/builtins/ReadableStreamInternals.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1656,7 +1656,7 @@ export function lazyLoadStream(stream, autoAllocateChunkSize) {
$assert(controller, "controller is missing");

if (result && $isPromise(result)) {
return result.then(
return result.$then(
handleNativeReadableStreamPromiseResult.bind({
c: controller,
v: view,
Expand Down
14 changes: 14 additions & 0 deletions test/js/bun/spawn/spawn.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,20 @@ for (let [gcTick, label] of [
});
});
}

it("should allow reading stdout after a few milliseconds", async () => {
for (let i = 0; i < 50; i++) {
const proc = Bun.spawn({
cmd: ["git", "--version"],
stdout: "pipe",
stderr: "ignore",
stdin: "ignore",
});
await Bun.sleep(1);
const out = await Bun.readableStreamToText(proc.stdout);
expect(out).not.toBe("");
}
});
});

it("throws errors for invalid arguments", async () => {
Expand Down

0 comments on commit 6e07f94

Please sign in to comment.