-
-
Notifications
You must be signed in to change notification settings - Fork 31.7k
[x] http: client destroy stream #29192
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,6 +23,7 @@ | |
|
||
const { Object } = primordials; | ||
|
||
const { destroy, kWritableState } = require('internal/streams/destroy'); | ||
const net = require('net'); | ||
const url = require('url'); | ||
const assert = require('internal/assert'); | ||
|
@@ -190,14 +191,24 @@ function ClientRequest(input, options, cb) { | |
|
||
this._ended = false; | ||
this.res = null; | ||
// The aborted property is for backwards compat and is not | ||
// strictly the same as destroyed as it can be written | ||
// to by the user. | ||
this.aborted = false; | ||
this.timeoutCb = null; | ||
this.upgradeOrConnect = false; | ||
this.parser = null; | ||
this.maxHeadersCount = null; | ||
this.reusedSocket = false; | ||
|
||
let called = false; | ||
// Used by destroyImpl. | ||
this[kWritableState] = { | ||
errorEmitted: false, | ||
destroyed: false, | ||
emitClose: true, | ||
socketPending: true, | ||
destroyCallback: null | ||
}; | ||
ronag marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
if (this.agent) { | ||
// If there is an agent we should default to Connection:keep-alive, | ||
|
@@ -261,11 +272,17 @@ function ClientRequest(input, options, cb) { | |
} | ||
|
||
const oncreate = (err, socket) => { | ||
if (called) | ||
const state = this[kWritableState]; | ||
if (!state.socketPending) | ||
return; | ||
called = true; | ||
state.socketPending = false; | ||
if (err) { | ||
process.nextTick(() => this.emit('error', err)); | ||
const { destroyCallback: cb } = state; | ||
if (cb) { | ||
cb(err); | ||
} else { | ||
this.destroy(err); | ||
} | ||
mcollina marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return; | ||
} | ||
this.onSocket(socket); | ||
|
@@ -280,9 +297,10 @@ function ClientRequest(input, options, cb) { | |
this._last = true; | ||
this.shouldKeepAlive = false; | ||
if (typeof options.createConnection === 'function') { | ||
const state = this[kWritableState]; | ||
const newSocket = options.createConnection(options, oncreate); | ||
if (newSocket && !called) { | ||
called = true; | ||
if (newSocket && state.socketPending) { | ||
state.socketPending = false; | ||
this.onSocket(newSocket); | ||
} else { | ||
return; | ||
|
@@ -298,6 +316,12 @@ function ClientRequest(input, options, cb) { | |
Object.setPrototypeOf(ClientRequest.prototype, OutgoingMessage.prototype); | ||
Object.setPrototypeOf(ClientRequest, OutgoingMessage); | ||
|
||
Object.defineProperty(ClientRequest.prototype, 'destroyed', { | ||
get() { | ||
return this[kWritableState].destroyed; | ||
} | ||
}); | ||
|
||
ClientRequest.prototype._finish = function _finish() { | ||
DTRACE_HTTP_CLIENT_REQUEST(this, this.socket); | ||
OutgoingMessage.prototype._finish.call(this); | ||
|
@@ -311,28 +335,37 @@ ClientRequest.prototype._implicitHeader = function _implicitHeader() { | |
this[kOutHeaders]); | ||
}; | ||
|
||
ClientRequest.prototype.abort = function abort() { | ||
if (!this.aborted) { | ||
process.nextTick(emitAbortNT.bind(this)); | ||
} | ||
ClientRequest.prototype.destroy = destroy; | ||
ClientRequest.prototype._destroy = function(err, cb) { | ||
this.aborted = true; | ||
process.nextTick(emitAbortNT, this); | ||
|
||
// If we're aborting, we don't care about any more response data. | ||
if (this.res) { | ||
this.res._dump(); | ||
} | ||
|
||
// In the event that we don't have a socket, we will pop out of | ||
// the request queue through handling in onSocket. | ||
if (this.socket) { | ||
if (this.upgradeOrConnect) { | ||
// We're detached from socket. | ||
cb(err); | ||
} else if (this.socket) { | ||
// in-progress | ||
this.socket.destroy(); | ||
this.socket.destroy(err, cb); | ||
} else if (this[kWritableState].socketPending) { | ||
// In the event that we don't have a socket, we will pop out of | ||
// the request queue through handling in onSocket. | ||
this[kWritableState].destroyCallback = (er) => cb(er || err); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it's better to wait for the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That won't work, the socket event is not always emitted. See the error case in The error from There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. From a user perspective, I think we should be swallowing that error if Do we test that calling There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This can be further refactored if/once #29656 is merged. But it basically does the same thing. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Test added There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The possibility of deadlocks/having a callback not called in the destroy process makes me nervous. The current logic is significantly simpler, even if less correct, and I feel this might bite us in the future. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
That's the way Node works? This is not the only case? What can we do to make you less nervous? More tests?
I strongly disagree with this. We have similar scenarios in a lot of different places and I have pending PR's (e.g. #29656) with efforts on this. If this (swallow error) is the way we go then I'd like to ask #29197 to be reconsidered (to "no error after destroy") in order to achieve some form of consistency. Otherwise, we will never have "correct" code even in the future. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
No. We should strive to not break current users, and then consistency. Correctness it's extremely hard to measure because there are no specifications for any of this. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
How does this break current users? |
||
} else { | ||
cb(err); | ||
} | ||
}; | ||
|
||
ClientRequest.prototype.abort = function abort() { | ||
this.destroy(); | ||
}; | ||
|
||
function emitAbortNT() { | ||
this.emit('abort'); | ||
function emitAbortNT(self) { | ||
self.emit('abort'); | ||
} | ||
|
||
function ondrain() { | ||
|
@@ -363,24 +396,23 @@ function socketCloseListener() { | |
res.aborted = true; | ||
res.emit('aborted'); | ||
} | ||
req.emit('close'); | ||
if (!res.aborted && res.readable) { | ||
res.on('end', function() { | ||
// We can only destroy req after 'end'. Otherwise we will dump the | ||
// data. | ||
req.destroy(); | ||
this.emit('close'); | ||
}); | ||
res.push(null); | ||
} else { | ||
req.destroy(); | ||
res.emit('close'); | ||
} | ||
} else { | ||
if (!req.socket._hadError) { | ||
// This socket error fired before we started to | ||
// receive a response. The error needs to | ||
// fire on the request. | ||
req.socket._hadError = true; | ||
req.emit('error', connResetException('socket hang up')); | ||
} | ||
req.emit('close'); | ||
// This socket error fired before we started to | ||
// receive a response. The error needs to | ||
// fire on the request. | ||
req.destroy(connResetException('socket hang up')); | ||
} | ||
|
||
// Too bad. That output wasn't getting written. | ||
|
@@ -400,13 +432,6 @@ function socketErrorListener(err) { | |
const req = socket._httpMessage; | ||
debug('SOCKET ERROR:', err.message, err.stack); | ||
|
||
if (req) { | ||
// For Safety. Some additional errors might fire later on | ||
// and we need to make sure we don't double-fire the error event. | ||
req.socket._hadError = true; | ||
req.emit('error', err); | ||
} | ||
|
||
const parser = socket.parser; | ||
if (parser) { | ||
parser.finish(); | ||
|
@@ -416,7 +441,7 @@ function socketErrorListener(err) { | |
// Ensure that no further data will come out of the socket | ||
socket.removeListener('data', socketOnData); | ||
socket.removeListener('end', socketOnEnd); | ||
socket.destroy(); | ||
req.destroy(err); | ||
} | ||
|
||
function freeSocketErrorListener(err) { | ||
|
@@ -431,17 +456,15 @@ function socketOnEnd() { | |
const req = this._httpMessage; | ||
const parser = this.parser; | ||
|
||
if (!req.res && !req.socket._hadError) { | ||
// If we don't have a response then we know that the socket | ||
// ended prematurely and we need to emit an error on the request. | ||
req.socket._hadError = true; | ||
req.emit('error', connResetException('socket hang up')); | ||
} | ||
if (parser) { | ||
parser.finish(); | ||
freeParser(parser, req, socket); | ||
} | ||
socket.destroy(); | ||
|
||
// If we don't have a response then we know that the socket | ||
// ended prematurely and we need to emit an error on the request. | ||
const err = !req.res ? connResetException('socket hang up') : null; | ||
mcollina marked this conversation as resolved.
Show resolved
Hide resolved
|
||
req.destroy(err); | ||
} | ||
|
||
function socketOnData(d) { | ||
|
@@ -456,9 +479,7 @@ function socketOnData(d) { | |
prepareError(ret, parser, d); | ||
debug('parse error', ret); | ||
freeParser(parser, req, socket); | ||
socket.destroy(); | ||
req.socket._hadError = true; | ||
req.emit('error', ret); | ||
req.destroy(ret); | ||
} else if (parser.incoming && parser.incoming.upgrade) { | ||
// Upgrade (if status code 101) or CONNECT | ||
const bytesParsed = ret; | ||
|
@@ -490,10 +511,10 @@ function socketOnData(d) { | |
socket.readableFlowing = null; | ||
|
||
req.emit(eventName, res, socket, bodyHead); | ||
req.emit('close'); | ||
req.destroy(); | ||
} else { | ||
// Requested Upgrade or used CONNECT method, but have no handler. | ||
socket.destroy(); | ||
req.destroy(); | ||
} | ||
} else if (parser.incoming && parser.incoming.complete && | ||
// When the status code is informational (100, 102-199), | ||
|
@@ -582,7 +603,7 @@ function parserOnIncomingClient(res, shouldKeepAlive) { | |
// If the user did not listen for the 'response' event, then they | ||
// can't possibly read the data, so we ._dump() it into the void | ||
// so that the socket doesn't hang there in a paused state. | ||
if (req.aborted || !req.emit('response', res)) | ||
if (req.destroyed || !req.emit('response', res)) | ||
res._dump(); | ||
|
||
if (method === 'HEAD') | ||
|
@@ -720,10 +741,11 @@ ClientRequest.prototype.onSocket = function onSocket(socket) { | |
}; | ||
|
||
function onSocketNT(req, socket) { | ||
if (req.aborted) { | ||
if (req.destroyed) { | ||
const { destroyCallback: cb } = req[kWritableState]; | ||
// If we were aborted while waiting for a socket, skip the whole thing. | ||
if (!req.agent) { | ||
socket.destroy(); | ||
socket.destroy(null, cb); | ||
mcollina marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} else { | ||
req.emit('close'); | ||
socket.emit('free'); | ||
|
Uh oh!
There was an error while loading. Please reload this page.