Skip to content

Commit 0738a2b

Browse files
ronagTrott
authored andcommitted
stream: finished should error on errored stream
Calling finished before or after a stream has errored or closed should end up with the same behavior. PR-URL: #39235 Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Rich Trott <rtrott@gmail.com>
1 parent 68548fd commit 0738a2b

File tree

4 files changed

+64
-26
lines changed

4 files changed

+64
-26
lines changed

lib/_http_client.js

+3
Original file line numberDiff line numberDiff line change
@@ -806,6 +806,9 @@ function onSocketNT(req, socket, err) {
806806
socket.emit('free');
807807
} else {
808808
finished(socket.destroy(err || req[kError]), (er) => {
809+
if (er?.code === 'ERR_STREAM_PREMATURE_CLOSE') {
810+
er = null;
811+
}
809812
_destroy(req, er || err);
810813
});
811814
return;

lib/_http_incoming.js

+3
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,9 @@ IncomingMessage.prototype._destroy = function _destroy(err, cb) {
188188
if (this.socket && !this.socket.destroyed && this.aborted) {
189189
this.socket.destroy(err);
190190
const cleanup = finished(this.socket, (e) => {
191+
if (e?.code === 'ERR_STREAM_PREMATURE_CLOSE') {
192+
e = null;
193+
}
191194
cleanup();
192195
process.nextTick(onError, this, e || err, cb);
193196
});

lib/internal/streams/end-of-stream.js

+35-26
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,7 @@ function eos(stream, options, callback) {
9898
isWritable(stream) === writable
9999
);
100100

101-
let writableFinished = stream.writableFinished ||
102-
(wState && wState.finished);
101+
let writableFinished = stream.writableFinished || wState?.finished;
103102
const onfinish = () => {
104103
writableFinished = true;
105104
// Stream should not be destroyed here. If it is that
@@ -111,8 +110,7 @@ function eos(stream, options, callback) {
111110
if (!readable || readableEnded) callback.call(stream);
112111
};
113112

114-
let readableEnded = stream.readableEnded ||
115-
(rState && rState.endEmitted);
113+
let readableEnded = stream.readableEnded || rState?.endEmitted;
116114
const onend = () => {
117115
readableEnded = true;
118116
// Stream should not be destroyed here. If it is that
@@ -128,7 +126,17 @@ function eos(stream, options, callback) {
128126
callback.call(stream, err);
129127
};
130128

129+
let closed = wState?.closed || rState?.closed;
130+
131131
const onclose = () => {
132+
closed = true;
133+
134+
const errored = wState?.errored || rState?.errored;
135+
136+
if (errored && typeof errored !== 'boolean') {
137+
return callback.call(stream, errored);
138+
}
139+
132140
if (readable && !readableEnded) {
133141
if (!isReadableEnded(stream))
134142
return callback.call(stream,
@@ -139,6 +147,7 @@ function eos(stream, options, callback) {
139147
return callback.call(stream,
140148
new ERR_STREAM_PREMATURE_CLOSE());
141149
}
150+
142151
callback.call(stream);
143152
};
144153

@@ -168,29 +177,29 @@ function eos(stream, options, callback) {
168177
if (options.error !== false) stream.on('error', onerror);
169178
stream.on('close', onclose);
170179

171-
// _closed is for OutgoingMessage which is not a proper Writable.
172-
const closed = (!wState && !rState && stream._closed === true) || (
173-
(wState && wState.closed) ||
174-
(rState && rState.closed) ||
175-
(wState && wState.errorEmitted) ||
176-
(rState && rState.errorEmitted) ||
177-
(rState && stream.req && stream.aborted) ||
178-
(
179-
(!writable || (wState && wState.finished)) &&
180-
(!readable || (rState && rState.endEmitted))
181-
)
182-
);
183-
184180
if (closed) {
185-
// TODO(ronag): Re-throw error if errorEmitted?
186-
// TODO(ronag): Throw premature close as if finished was called?
187-
// before being closed? i.e. if closed but not errored, ended or finished.
188-
// TODO(ronag): Throw some kind of error? Does it make sense
189-
// to call finished() on a "finished" stream?
190-
// TODO(ronag): willEmitClose?
191-
process.nextTick(() => {
192-
callback();
193-
});
181+
process.nextTick(onclose);
182+
} else if (wState?.errorEmitted || rState?.errorEmitted) {
183+
if (!willEmitClose) {
184+
process.nextTick(onclose);
185+
}
186+
} else if (
187+
!readable &&
188+
(!willEmitClose || stream.readable) &&
189+
writableFinished
190+
) {
191+
process.nextTick(onclose);
192+
} else if (
193+
!writable &&
194+
(!willEmitClose || stream.writable) &&
195+
readableEnded
196+
) {
197+
process.nextTick(onclose);
198+
} else if (!wState && !rState && stream._closed === true) {
199+
// _closed is for OutgoingMessage which is not a proper Writable.
200+
process.nextTick(onclose);
201+
} else if ((rState && stream.req && stream.aborted)) {
202+
process.nextTick(onclose);
194203
}
195204

196205
const cleanup = () => {

test/parallel/test-stream-finished.js

+23
Original file line numberDiff line numberDiff line change
@@ -608,3 +608,26 @@ testClosed((opts) => new Writable({ write() {}, ...opts }));
608608
assert.strictEqual(closed, true);
609609
}));
610610
}
611+
612+
{
613+
const w = new Writable();
614+
const _err = new Error();
615+
w.destroy(_err);
616+
finished(w, common.mustCall((err) => {
617+
assert.strictEqual(_err, err);
618+
finished(w, common.mustCall((err) => {
619+
assert.strictEqual(_err, err);
620+
}));
621+
}));
622+
}
623+
624+
{
625+
const w = new Writable();
626+
w.destroy();
627+
finished(w, common.mustCall((err) => {
628+
assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
629+
finished(w, common.mustCall((err) => {
630+
assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
631+
}));
632+
}));
633+
}

0 commit comments

Comments
 (0)