diff --git a/source/as-stream.ts b/source/as-stream.ts index 37da592ff..7a818d8e1 100644 --- a/source/as-stream.ts +++ b/source/as-stream.ts @@ -81,7 +81,7 @@ export default function asStream(options: NormalizedOptions): ProxyStream response, output, error => { - if (error) { + if (error && error.message !== 'Premature close') { emitError(new ReadError(error, options)); } } diff --git a/source/request-as-event-emitter.ts b/source/request-as-event-emitter.ts index 76fa850d6..49e18dbdc 100644 --- a/source/request-as-event-emitter.ts +++ b/source/request-as-event-emitter.ts @@ -34,6 +34,9 @@ export default (options: NormalizedOptions): RequestAsEventEmitter => { let currentRequest: http.ClientRequest; + // `request.aborted` is a boolean since v11.0.0: https://github.com/nodejs/node/commit/4b00c4fafaa2ae8c41c1f78823c0feb810ae4723#diff-e3bc37430eb078ccbafe3aa3b570c91a + const isAborted = (): boolean => typeof currentRequest.aborted === 'number' || (currentRequest.aborted as unknown as boolean); + const emitError = async (error: GeneralError): Promise => { try { for (const hook of options.hooks.beforeError) { @@ -135,15 +138,27 @@ export default (options: NormalizedOptions): RequestAsEventEmitter => { return; } - await getResponse(typedResponse, options, emitter); + try { + await getResponse(typedResponse, options, emitter); + } catch (error) { + // Don't throw `Premature close` if the request has been aborted + if (!(isAborted() && error.message === 'Premature close')) { + throw error; + } + } } catch (error) { emitError(error); } }; const handleRequest = async (request: http.ClientRequest): Promise => { - // `request.aborted` is a boolean since v11.0.0: https://github.com/nodejs/node/commit/4b00c4fafaa2ae8c41c1f78823c0feb810ae4723#diff-e3bc37430eb078ccbafe3aa3b570c91a - const isAborted = (): boolean => typeof request.aborted === 'number' || (request.aborted as unknown as boolean); + let piped = false; + let finished = false; + + // `request.finished` doesn't indicate whether this has been emitted or not + request.once('finish', () => { + finished = true; + }); currentRequest = request; @@ -159,16 +174,21 @@ export default (options: NormalizedOptions): RequestAsEventEmitter => { } }; - const attachErrorHandler = (): void => { - request.once('error', error => { - // We need to allow `TimedOutTimeoutError` here, because `stream.pipeline(…)` aborts the request automatically. + request.on('error', error => { + if (piped) { + // Check if it's caught by `stream.pipeline(...)` + if (!finished) { + return; + } + + // We need to let `TimedOutTimeoutError` through, because `stream.pipeline(…)` aborts the request automatically. if (isAborted() && !(error instanceof TimedOutTimeoutError)) { return; } + } - onError(error); - }); - }; + onError(error); + }); try { timer(request); @@ -178,14 +198,14 @@ export default (options: NormalizedOptions): RequestAsEventEmitter => { const uploadStream = createProgressStream('uploadProgress', emitter, httpOptions.headers!['content-length'] as string); + piped = true; + await pipeline( httpOptions.body!, uploadStream, request ); - attachErrorHandler(); - request.emit('upload-complete'); } catch (error) { if (isAborted() && error.message === 'Premature close') { @@ -194,9 +214,6 @@ export default (options: NormalizedOptions): RequestAsEventEmitter => { } onError(error); - - // Handle future errors - attachErrorHandler(); } }; diff --git a/source/utils/timed-out.ts b/source/utils/timed-out.ts index f480df4c2..68cae7109 100644 --- a/source/utils/timed-out.ts +++ b/source/utils/timed-out.ts @@ -84,6 +84,8 @@ export default (request: ClientRequest, delays: Delays, options: TimedOutOptions } }); + request.once('abort', cancelTimeouts); + once(request, 'response', (response: IncomingMessage): void => { once(response, 'end', cancelTimeouts); }); diff --git a/test/stream.ts b/test/stream.ts index bbc249e85..1c376cd5f 100644 --- a/test/stream.ts +++ b/test/stream.ts @@ -261,7 +261,7 @@ test('proxies `content-encoding` header when `options.decompress` is false', wit t.is(headers['content-encoding'], 'gzip'); }); -test('destroying got.stream() cancels the request', withServer, async (t, server, got) => { +test('destroying got.stream() cancels the request - `request` event', withServer, async (t, server, got) => { server.get('/', defaultHandler); const stream = got.stream(''); @@ -270,6 +270,18 @@ test('destroying got.stream() cancels the request', withServer, async (t, server t.truthy(request.aborted); }); +test('destroying got.stream() cancels the request - `response` event', withServer, async (t, server, got) => { + server.get('/', (_request, response) => { + response.write('hello'); + }); + + const stream = got.stream(''); + const request = await pEvent(stream, 'request'); + await pEvent(stream, 'response'); + stream.destroy(); + t.truthy(request.aborted); +}); + test('piping to got.stream.put()', withServer, async (t, server, got) => { server.get('/', defaultHandler); server.put('/post', postHandler); diff --git a/test/timeout.ts b/test/timeout.ts index 25efc333d..5dd9fe259 100644 --- a/test/timeout.ts +++ b/test/timeout.ts @@ -442,12 +442,38 @@ test.serial('no unhandled timeout errors', withServer, async (t, _server, got) = return result; } - })); + }), 'socket hang up'); await delay(200); }); -test.serial('no more timeouts after an error', withServerAndLolex, async (t, _server, got, clock) => { +test.serial('no more timeouts after an error', withServer, async (t, _server, got) => { + const {setTimeout} = global; + const {clearTimeout} = global; + + // @ts-ignore + global.setTimeout = (callback, _ms, ...args) => { + const timeout = { + cleared: false + }; + + process.nextTick(() => { + if (timeout.cleared) { + return; + } + + callback(...args); + }); + + return timeout; + }; + + // @ts-ignore + global.clearTimeout = timeout => { + // @ts-ignore + timeout.cleared = true; + }; + await t.throwsAsync(got(`http://${Date.now()}.dev`, { retry: 1, timeout: { @@ -459,20 +485,12 @@ test.serial('no more timeouts after an error', withServerAndLolex, async (t, _se send: 1, request: 1 } - }).on('request', () => { - const {setTimeout} = global; - // @ts-ignore Augmenting global for testing purposes - global.setTimeout = (callback, _ms, ...args) => { - callback(...args); - - global.setTimeout = setTimeout; - }; + }), {instanceOf: got.TimeoutError}); - clock.runAll(); - }), {instanceOf: got.GotError}); + await delay(100); - // Wait a bit more to check if there are any unhandled errors - clock.tick(100); + global.setTimeout = setTimeout; + global.clearTimeout = clearTimeout; }); test.serial('socket timeout is canceled on error', withServerAndLolex, async (t, _server, got, clock) => { @@ -564,3 +582,44 @@ test.serial('doesn\'t throw on early lookup', withServerAndLolex, async (t, serv } })); }); + +// TODO: use lolex here +test.serial('no unhandled `Premature close` error', withServer, async (t, server, got) => { + server.get('/', async (_request, response) => { + response.write('hello'); + + await delay(10); + response.end(); + }); + + await t.throwsAsync(got({ + timeout: 10, + retry: 0 + }), 'Timeout awaiting \'request\' for 10ms'); + + await delay(20); +}); + +// TODO: use lolex here +test.serial('cancelling the request removes timeouts', withServer, async (t, server, got) => { + server.get('/', (_request, response) => { + response.write('hello'); + }); + + const promise = got({ + timeout: 500, + retry: 0 + }).on('downloadProgress', () => { + promise.cancel(); + }).on('request', request => { + request.on('error', error => { + if (error.message === 'Timeout awaiting \'request\' for 500ms') { + t.fail(error.message); + } + }); + }); + + await t.throwsAsync(promise, 'Promise was canceled'); + + await delay(1000); +});