-
Notifications
You must be signed in to change notification settings - Fork 3.4k
Fix: after pausing stdin, a subprocess should be able to read from stdin #23341
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
efb9a1f
14360ee
77f9062
9e4e3ae
9d7aa47
ae782bc
d629c24
7d8e99d
bb79ed0
7aeec9a
6ac30c8
88f6936
32de07d
d227cc7
847c870
4a31b21
80064c1
b7db557
5d0165e
56b3e3b
727d1d4
a57f9e7
3ea42f2
114fe13
b06eaa7
f98fefe
119ead6
5e6219f
b22b023
b81c581
eb8717e
f59d15b
596f9a5
ad7dd93
5fd2d04
1337061
1554633
4c7669d
97e1844
fe724c6
068ef9f
114af68
71107b4
114e8b9
cd0e493
d048579
621cd6d
60995f6
47c3c89
336538e
671cc3f
90e0d97
77661e3
5860414
89e6db6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -397,7 +397,9 @@ const StreamTransfer = struct { | |
log("max_size reached, ending stream", .{}); | ||
if (this.route.server) |server| { | ||
// dont need to ref because we are already holding a ref and will be derefed in onReaderDone | ||
this.reader.pause(); | ||
if (!bun.Environment.isPosix) { | ||
this.reader.pause(); | ||
} | ||
// we cannot free inside onReadChunk this would be UAF so we schedule it to be done in the next event loop tick | ||
this.eof_task = jsc.AnyTask.New(StreamTransfer, StreamTransfer.onReaderDone).init(this); | ||
server.vm().enqueueTask(jsc.Task.init(&this.eof_task.?)); | ||
|
@@ -430,7 +432,9 @@ const StreamTransfer = struct { | |
// pause the reader so deref until onWritable | ||
defer this.deref(); | ||
this.resp.onWritable(*StreamTransfer, onWritable, this); | ||
this.reader.pause(); | ||
if (!bun.Environment.isPosix) { | ||
this.reader.pause(); | ||
} | ||
Comment on lines
+435
to
+437
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧹 Nitpick | 🔵 Trivial Document why pause is Windows-only. Same as the max_size scenario: the platform-specific pause behavior lacks documentation. Consider adding a comment explaining the flow-control difference between POSIX and Windows. Apply this diff to add clarifying documentation: switch (this.resp.write(chunk)) {
.backpressure => {
// pause the reader so deref until onWritable
defer this.deref();
this.resp.onWritable(*StreamTransfer, onWritable, this);
+ // On Windows, explicitly pause to stop reading
+ // On POSIX, flow control is managed by the new setFlowing mechanism
if (!bun.Environment.isPosix) {
this.reader.pause();
} 🤖 Prompt for AI Agents
|
||
return false; | ||
}, | ||
.want_more => { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,6 +18,7 @@ lazy: Lazy = .{ .none = {} }, | |
buffered: std.ArrayListUnmanaged(u8) = .{}, | ||
read_inside_on_pull: ReadDuringJSOnPullResult = .{ .none = {} }, | ||
highwater_mark: usize = 16384, | ||
flowing: bool = true, | ||
|
||
pub const IOReader = bun.io.BufferedReader; | ||
pub const Poll = IOReader; | ||
|
@@ -487,6 +488,14 @@ pub fn onPull(this: *FileReader, buffer: []u8, array: jsc.JSValue) streams.Resul | |
} | ||
|
||
if (!this.reader.hasPendingRead()) { | ||
// If not flowing (paused), don't initiate new reads | ||
if (!this.flowing) { | ||
log("onPull({d}) = pending (not flowing)", .{buffer.len}); | ||
this.pending_value.set(this.parent().globalThis, array); | ||
this.pending_view = buffer; | ||
return .{ .pending = &this.pending }; | ||
} | ||
|
||
this.read_inside_on_pull = .{ .js = buffer }; | ||
this.reader.read(); | ||
|
||
|
@@ -581,32 +590,15 @@ pub fn onReaderDone(this: *FileReader) void { | |
} | ||
this.buffered = .{}; | ||
this.pending.run(); | ||
} else if (this.buffered.items.len > 0) { | ||
const this_value = this.parent().this_jsvalue; | ||
const globalThis = this.parent().globalThis; | ||
if (this_value != .zero) { | ||
if (Source.js.onDrainCallbackGetCached(this_value)) |cb| { | ||
const buffered = this.buffered; | ||
this.buffered = .{}; | ||
this.parent().incrementCount(); | ||
defer _ = this.parent().decrementCount(); | ||
this.eventLoop().js.runCallback( | ||
cb, | ||
globalThis, | ||
.js_undefined, | ||
&.{ | ||
jsc.ArrayBuffer.fromBytes(buffered.items, .Uint8Array).toJS(globalThis) catch |err| { | ||
this.pending.result = .{ .err = .{ .WeakJSValue = globalThis.takeException(err) } }; | ||
return; | ||
}, | ||
}, | ||
); | ||
} | ||
} | ||
} | ||
// Don't handle buffered data here - it will be returned on the next onPull | ||
// This ensures proper ordering of chunks | ||
} | ||
|
||
this.parent().onClose(); | ||
// Only close the stream if there's no buffered data left to deliver | ||
if (this.buffered.items.len == 0) { | ||
this.parent().onClose(); | ||
} | ||
if (this.waiting_for_onReaderDone) { | ||
Comment on lines
+594
to
602
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chainonClose may never fire if buffered > 0 at EOF Calling onClose only when buffered is empty risks never invoking the close handler because there's no later point that calls parent().onClose() after buffered data is delivered. This can break consumer onclose handlers and leak lifecycle signals. Apply this minimal fix to always signal close when the reader is done (ordering-sensitive consumers can still rely on the “done” result ordering): - // Only close the stream if there's no buffered data left to deliver
- if (this.buffered.items.len == 0) {
- this.parent().onClose();
- }
+ // Signal close exactly once when the reader is done.
+ // Ordering of data vs. close is handled at the JS layer via result flags.
+ this.parent().onClose(); If strict “data then close” ordering is required, alternatively queue onClose on the event loop after pending.run() resolves. Can you confirm whether any tests depend on close firing strictly after the last chunk? If so, I can propose an event‑loop queued onClose to preserve ordering. 🏁 Script executed: #!/bin/bash
set -e
rg -n 'waiting_for_onReaderDone' -C3 src/bun.js/webcore/FileReader.zig
echo "-----"
rg -n 'onClose' -C3 src/bun.js/webcore/FileReader.zig Length of output: 1332 #!/bin/bash
set -e
sed -n '500,650p' src/bun.js/webcore/FileReader.zig 🏁 Script executed: sed -n '550,650p' src/bun.js/webcore/FileReader.zig Length of output: 2932 🏁 Script executed: rg -n 'fn onPull' -C5 src/bun.js/webcore/FileReader.zig Length of output: 350 Always signal close when reader is done Guarding 🤖 Prompt for AI Agents
|
||
this.waiting_for_onReaderDone = false; | ||
_ = this.parent().decrementCount(); | ||
|
@@ -631,6 +623,26 @@ pub fn setRawMode(this: *FileReader, flag: bool) bun.sys.Maybe(void) { | |
return this.reader.setRawMode(flag); | ||
} | ||
|
||
pub fn setFlowing(this: *FileReader, flag: bool) void { | ||
log("setFlowing({}) was={}", .{ flag, this.flowing }); | ||
|
||
if (this.flowing == flag) { | ||
return; | ||
} | ||
|
||
this.flowing = flag; | ||
|
||
if (flag) { | ||
this.reader.unpause(); | ||
if (!this.reader.isDone() and !this.reader.hasPendingRead()) { | ||
// Kick off a new read if needed | ||
this.reader.read(); | ||
} | ||
} else { | ||
this.reader.pause(); | ||
} | ||
} | ||
|
||
pub fn memoryCost(this: *const FileReader) usize { | ||
// ReadableStreamSource covers @sizeOf(FileReader) | ||
return this.reader.memoryCost() + this.buffered.capacity; | ||
|
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -577,6 +577,19 @@ pub fn NewSource( | |||||||||||||||||||||||||||||||||||||||||||||||||||
@compileError("setRawMode is not implemented on " ++ @typeName(Context)); | ||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||
pub fn setFlowingFromJS(this: *ReadableStreamSourceType, _: *jsc.JSGlobalObject, call_frame: *jsc.CallFrame) bun.JSError!JSValue { | ||||||||||||||||||||||||||||||||||||||||||||||||||||
if (@hasDecl(Context, "setFlowing")) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||
const flag = call_frame.argument(0); | ||||||||||||||||||||||||||||||||||||||||||||||||||||
if (Environment.allow_assert) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||
bun.assert(flag.isBoolean()); | ||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||
this.context.setFlowing(flag == .true); | ||||||||||||||||||||||||||||||||||||||||||||||||||||
return .js_undefined; | ||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||
return .js_undefined; | ||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+580
to
+591
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧹 Nitpick | 🔵 Trivial Mirror setRawModeFromJS: mark binding, set this, and compile‑time guard Add markBinding and cache this for parity; prefer a compile‑time error if Context lacks setFlowing (like setRawMode). Apply: - pub fn setFlowingFromJS(this: *ReadableStreamSourceType, _: *jsc.JSGlobalObject, call_frame: *jsc.CallFrame) bun.JSError!JSValue {
- if (@hasDecl(Context, "setFlowing")) {
+ pub fn setFlowingFromJS(this: *ReadableStreamSourceType, global: *jsc.JSGlobalObject, call_frame: *jsc.CallFrame) bun.JSError!JSValue {
+ jsc.markBinding(@src());
+ this.this_jsvalue = call_frame.this();
+ if (@hasDecl(Context, "setFlowing")) {
const flag = call_frame.argument(0);
if (Environment.allow_assert) {
bun.assert(flag.isBoolean());
}
this.context.setFlowing(flag == .true);
return .js_undefined;
}
-
- return .js_undefined;
+ @compileError("setFlowing is not implemented on " ++ @typeName(Context));
} 📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents
|
||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||
const supports_ref = setRefUnrefFn != null; | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||
pub const js = @field(jsc.Codegen, "JS" ++ name_ ++ "InternalReadableStreamSource"); | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧹 Nitpick | 🔵 Trivial
Document why pause is Windows-only.
The POSIX guard on
reader.pause()
suggests platform-specific flow-control semantics, but the rationale isn't documented. Consider adding a brief comment explaining why pause is only needed on non-POSIX platforms when max_size is reached.Apply this diff to add clarifying documentation:
📝 Committable suggestion