-
Notifications
You must be signed in to change notification settings - Fork 613
Description
Hello, I was in the process of implementing file uploading/body streaming into https://github.com/kartikk221/hyper-express and ran into an issue when attempting to stream file data using chunked encoding transfer. From messing with various different scenarios it seems uWebsockets.js instantly closes a request connection if it detects some body data is being sent with a request that has no content-length header. I have provided some context below for the implementation/test case in which bug occurs:
Implementation (/src/components/http/Request.js):
/**
* Initializes a readable stream which consumes body data from uWS.Response.onData() handler.
* Note! This stream will automatically handle backpressure by pausing/resuming the response data flow.
*
* @private
* @param {stream.ReadableOptions} options
*/
_start_streaming(options) {
// Throw an error if we try to streaming when one is already in process
if (this.#body_stream)
throw new Error(
'HyperExpress.Request._start_streaming() -> This method has been called more than once which should not occur.'
);
// Create a readable stream which pipes data from the uWS.Response.onData() handler
const readable = new stream.Readable(options);
// Bind a _read() handler to the readable to resume the uWS request
readable._read = () => this.resume();
// Bind a uWS.Response.onData() handler which will handle incoming chunks and pipe them to the readable stream
const reference = this;
this.#raw_response.onData((array_buffer, is_last) => {
// Do not process chunk if we can no longer pipe it to the readable stream
if (reference.#body_stream === null) return;
// Convert the ArrayBuffer to a Buffer reference
// Provide raw chunks if specified and we have something consuming stream already
// This will prevent unneccessary duplication of buffers
let buffer;
if (reference.#stream_raw_chunks && readable.listenerCount('data') > 0) {
// Store a direct Buffer reference as this will be immediately consumed
buffer = Buffer.from(array_buffer);
console.log('Raw Chunk ->', buffer.length);
} else {
// Store a copy of the array_buffer as we have no immediate consumer yet
// If we do not copy, this chunk will be lost in stream queue as it will be deallocated by uWebsockets
buffer = Buffer.concat([Buffer.from(array_buffer)]);
console.log('Copied Chunk ->', buffer.length);
}
// Push the incoming chunk into readable stream for consumption
// Pause the uWS request if our stream is backed up
if (!readable.push(buffer)) reference.pause();
// Push a null chunk signaling an EOF to the stream to end it if this chunk is last
if (is_last) {
console.log('EOF Chunk ->');
readable.push(null);
}
});
// Store the readable stream locally for consumption
this.#body_stream = readable;
}Test Case:
const path = require('path');
const crypto = require('crypto');
const fs = require('fs');
const { assert_log } = require('../../../scripts/operators.js');
const { HyperExpress, fetch, server } = require('../../../configuration.js');
const router = new HyperExpress.Router();
const endpoint = '/tests/request';
const scenario_endpoint = '/stream-pipe';
const endpoint_url = server.base + endpoint + scenario_endpoint;
const test_file_path = path.resolve(path.join(__dirname, '../../../content/large-image.jpg'));
const test_file_stats = fs.statSync(test_file_path);
function get_file_write_path(file_name) {
return path.resolve(path.join(__dirname, '../../../content/written/' + file_name));
}
// Create Backend HTTP Route
router.post(scenario_endpoint, async (request, response) => {
console.log(request.headers);
// Create a writable stream to specified file name path
const file_name = request.headers['x-file-name'];
const path = get_file_write_path(file_name);
const writable = fs.createWriteStream(path);
// Pipe the readable body stream to the writable and wait for it to finish
request.stream.pipe(writable);
await new Promise((resolve) => writable.once('finish', resolve));
// Read the written file's buffer and calculate its md5 hash
const written_buffer = fs.readFileSync(path);
const written_hash = crypto.createHash('md5').update(written_buffer).digest('hex');
// Cleanup the written file for future testing
fs.rmSync(path);
// Return the written hash to be validated on client side
return response.json({
hash: written_hash,
});
});
// Bind router to webserver
const { TEST_SERVER } = require('../../Server.js');
TEST_SERVER.use(endpoint, router);
async function test_request_stream_pipe() {
const group = 'REQUEST';
const candidate = 'HyperExpress.Request.stream';
// Send a buffer of the file in the request body so we have a content-length on server side
const expected_buffer = fs.readFileSync(test_file_path);
const expected_hash = crypto.createHash('md5').update(expected_buffer).digest('hex');
const buffer_upload_response = await fetch(endpoint_url, {
method: 'POST',
headers: {
'x-file-name': 'request_upload_buffer.jpg',
},
body: expected_buffer,
});
// Validate the hash uploaded on the server side with the expected hash from client side
const buffer_upload_body = await buffer_upload_response.json();
assert_log(
group,
`${candidate} Piped Upload With Content Length - ${expected_hash} === ${buffer_upload_body.hash} - ${test_file_stats.size} bytes`,
() => expected_hash === buffer_upload_body.hash
);
// Stream the file to send using chunked transfer encoding without a content-length
const chunked_upload_response = await fetch(endpoint_url, {
method: 'POST',
headers: {
'x-file-name': 'request_upload_transfer.jpg',
},
body: fs.createReadStream(test_file_path),
});
// Validate the hash uploaded on the server side with the expected hash from client side
const chunked_upload_body = await chunked_upload_response.json();
assert_log(
group,
`${candidate} Piped Upload With Chunked Encoding Transfer - ${expected_hash} === ${chunked_upload_body.hash} - ${test_file_stats.size} bytes`,
() => expected_hash === chunked_upload_body.hash
);
}
module.exports = {
test_request_stream_pipe,
};In the test case above, the buffer_upload_response fetch tests the streaming by sending a Buffer of the test file which works as expected since a content-length header is sent along with the fetch request. But the chunked_upload_response fetch request throws an FetchError: request to http://127.0.0.1:8080/tests/request/stream-pipe failed, reason: write ECONNRESET error signifying that the server abruptly closed the connection even though the server does not crash. I have provided the console logs below so you can get more context for request headers and body chunks being received on the server side in both fetch requests.
Logs:
[12/19/2021 12:51:06:379ms PM][TESTING] Successfully Started HyperExpress HTTP Server @ 127.0.0.1:8080
[12/19/2021 12:51:06:492ms PM][REQUEST] Testing HyperExpress.Request Object...
[12/19/2021 12:51:06:494ms PM][REQUEST] Generating A Large 10,485,760 Characters Size Body To Simulate Too-Large Large Payload...
Raw Chunk -> 31
EOF Chunk ->
[12/19/2021 12:51:06:540ms PM][REQUEST] Verified Too Large Body 413 HTTP Code Reject
Raw Chunk -> 65086
Raw Chunk -> 65536
Raw Chunk -> 131072
Raw Chunk -> 262144
Raw Chunk -> 524288
Raw Chunk -> 458793
Raw Chunk -> 524288
Raw Chunk -> 524288
Raw Chunk -> 64798
Raw Chunk -> 524288
Raw Chunk -> 523490
Raw Chunk -> 524288
Raw Chunk -> 524288
Raw Chunk -> 524288
Raw Chunk -> 1962
EOF Chunk ->
[12/19/2021 12:51:06:950ms PM][REQUEST] Verified Middleware Execution & Timing Test
[12/19/2021 12:51:06:951ms PM][REQUEST] Verified Middleware Property Binding Test
[12/19/2021 12:51:06:951ms PM][REQUEST] Verified Route Specific Middleware Avoidance Test
[12/19/2021 12:51:07:208ms PM][REQUEST] Verified Route Specific Middleware Binding & Property Test
[12/19/2021 12:51:07:210ms PM][REQUEST] Verified HyperExpress.Request Middleware Double Iteration Violation
[12/19/2021 12:51:07:211ms PM][REQUEST] Verified HyperExpress.Request Middleware Thrown Iteration Error Handler
[12/19/2021 12:51:07:212ms PM][REQUEST] Verified HyperExpress.Request.method
[12/19/2021 12:51:07:212ms PM][REQUEST] Verified HyperExpress.Request.url
[12/19/2021 12:51:07:212ms PM][REQUEST] Verified HyperExpress.Request.path
[12/19/2021 12:51:07:212ms PM][REQUEST] Verified HyperExpress.Request.query
[12/19/2021 12:51:07:212ms PM][REQUEST] Verified HyperExpress.Request.ip
[12/19/2021 12:51:07:213ms PM][REQUEST] Verified HyperExpress.Request.proxy_ip
[12/19/2021 12:51:07:213ms PM][REQUEST] Verified HyperExpress.Request.headers["x-test-value", "cookie", "content-length"]
[12/19/2021 12:51:07:213ms PM][REQUEST] Verified HyperExpress.Request.query_parameters
[12/19/2021 12:51:07:213ms PM][REQUEST] Verified HyperExpress.Request.path_parameters
[12/19/2021 12:51:07:213ms PM][REQUEST] Verified HyperExpress.Request.cookies
[12/19/2021 12:51:07:213ms PM][REQUEST] Verified HyperExpress.Request.body
{
'x-file-name': 'request_upload_buffer.jpg',
accept: '*/*',
'content-length': '879067',
'user-agent': 'node-fetch/1.0 (+https://github.com/bitinn/node-fetch)',
'accept-encoding': 'gzip,deflate',
connection: 'close',
host: '127.0.0.1:8080'
}
Copied Chunk -> 65234
Copied Chunk -> 524288
Paused Request!
Resumed Request!
Copied Chunk -> 289545
Paused Request!
EOF Chunk ->
Resumed Request!
[12/19/2021 12:51:07:222ms PM][REQUEST] Verified HyperExpress.Request.stream Piped Upload With Content Length - b65f7a483ee36dacdf5a8896b19937f8 === b65f7a483ee36dacdf5a8896b19937f8 - 879067 bytes
{
'x-file-name': 'request_upload_transfer.jpg',
accept: '*/*',
'user-agent': 'node-fetch/1.0 (+https://github.com/bitinn/node-fetch)',
'accept-encoding': 'gzip,deflate',
connection: 'close',
host: '127.0.0.1:8080',
'transfer-encoding': 'chunked'
}
Copied Chunk -> 0
EOF Chunk ->
FetchError: request to http://127.0.0.1:8080/tests/request/stream-pipe failed, reason: write ECONNRESET
at ClientRequest.<anonymous> (F:\NPM Packages\hyper-express\tests\node_modules\node-fetch\lib\index.js:1483:11)
at ClientRequest.emit (node:events:406:35)
at Socket.socketErrorListener (node:_http_client:447:9)
at Socket.emit (node:events:394:28)
at emitErrorNT (node:internal/streams/destroy:157:8)
at emitErrorCloseNT (node:internal/streams/destroy:122:3)
at processTicksAndRejections (node:internal/process/task_queues:83:21) {
type: 'system',
errno: 'ECONNRESET',
code: 'ECONNRESET'
}