Skip to content

Posting data with chunked transfer encoding causes request to be abruptly closed #669

@kartikk221

Description

@kartikk221

Hello, I was in the process of implementing file uploading/body streaming into https://github.com/kartikk221/hyper-express and ran into an issue when attempting to stream file data using chunked encoding transfer. From messing with various different scenarios it seems uWebsockets.js instantly closes a request connection if it detects some body data is being sent with a request that has no content-length header. I have provided some context below for the implementation/test case in which bug occurs:

Implementation (/src/components/http/Request.js):

/**
 * Initializes a readable stream which consumes body data from uWS.Response.onData() handler.
 * Note! This stream will automatically handle backpressure by pausing/resuming the response data flow.
 *
 * @private
 * @param {stream.ReadableOptions} options
 */
_start_streaming(options) {
    // Throw an error if we try to streaming when one is already in process
    if (this.#body_stream)
        throw new Error(
            'HyperExpress.Request._start_streaming() -> This method has been called more than once which should not occur.'
        );

    // Create a readable stream which pipes data from the uWS.Response.onData() handler
    const readable = new stream.Readable(options);

    // Bind a _read() handler to the readable to resume the uWS request
    readable._read = () => this.resume();

    // Bind a uWS.Response.onData() handler which will handle incoming chunks and pipe them to the readable stream
    const reference = this;
    this.#raw_response.onData((array_buffer, is_last) => {
        // Do not process chunk if we can no longer pipe it to the readable stream
        if (reference.#body_stream === null) return;

        // Convert the ArrayBuffer to a Buffer reference
        // Provide raw chunks if specified and we have something consuming stream already
        // This will prevent unneccessary duplication of buffers
        let buffer;
        if (reference.#stream_raw_chunks && readable.listenerCount('data') > 0) {
            // Store a direct Buffer reference as this will be immediately consumed
            buffer = Buffer.from(array_buffer);
            console.log('Raw Chunk ->', buffer.length);
        } else {
            // Store a copy of the array_buffer as we have no immediate consumer yet
            // If we do not copy, this chunk will be lost in stream queue as it will be deallocated by uWebsockets
            buffer = Buffer.concat([Buffer.from(array_buffer)]);
            console.log('Copied Chunk ->', buffer.length);
        }

        // Push the incoming chunk into readable stream for consumption
        // Pause the uWS request if our stream is backed up
        if (!readable.push(buffer)) reference.pause();

        // Push a null chunk signaling an EOF to the stream to end it if this chunk is last
        if (is_last) {
            console.log('EOF Chunk ->');
            readable.push(null);
        }
    });

    // Store the readable stream locally for consumption
    this.#body_stream = readable;
}

Test Case:

const path = require('path');
const crypto = require('crypto');
const fs = require('fs');
const { assert_log } = require('../../../scripts/operators.js');
const { HyperExpress, fetch, server } = require('../../../configuration.js');
const router = new HyperExpress.Router();
const endpoint = '/tests/request';
const scenario_endpoint = '/stream-pipe';
const endpoint_url = server.base + endpoint + scenario_endpoint;
const test_file_path = path.resolve(path.join(__dirname, '../../../content/large-image.jpg'));
const test_file_stats = fs.statSync(test_file_path);

function get_file_write_path(file_name) {
    return path.resolve(path.join(__dirname, '../../../content/written/' + file_name));
}

// Create Backend HTTP Route
router.post(scenario_endpoint, async (request, response) => {
    console.log(request.headers);
    // Create a writable stream to specified file name path
    const file_name = request.headers['x-file-name'];
    const path = get_file_write_path(file_name);
    const writable = fs.createWriteStream(path);

    // Pipe the readable body stream to the writable and wait for it to finish
    request.stream.pipe(writable);
    await new Promise((resolve) => writable.once('finish', resolve));

    // Read the written file's buffer and calculate its md5 hash
    const written_buffer = fs.readFileSync(path);
    const written_hash = crypto.createHash('md5').update(written_buffer).digest('hex');

    // Cleanup the written file for future testing
    fs.rmSync(path);

    // Return the written hash to be validated on client side
    return response.json({
        hash: written_hash,
    });
});

// Bind router to webserver
const { TEST_SERVER } = require('../../Server.js');
TEST_SERVER.use(endpoint, router);

async function test_request_stream_pipe() {
    const group = 'REQUEST';
    const candidate = 'HyperExpress.Request.stream';

    // Send a buffer of the file in the request body so we have a content-length on server side
    const expected_buffer = fs.readFileSync(test_file_path);
    const expected_hash = crypto.createHash('md5').update(expected_buffer).digest('hex');
    const buffer_upload_response = await fetch(endpoint_url, {
        method: 'POST',
        headers: {
            'x-file-name': 'request_upload_buffer.jpg',
        },
        body: expected_buffer,
    });

    // Validate the hash uploaded on the server side with the expected hash from client side
    const buffer_upload_body = await buffer_upload_response.json();
    assert_log(
        group,
        `${candidate} Piped Upload With Content Length - ${expected_hash} === ${buffer_upload_body.hash} - ${test_file_stats.size} bytes`,
        () => expected_hash === buffer_upload_body.hash
    );

    // Stream the file to send using chunked transfer encoding without a content-length
    const chunked_upload_response = await fetch(endpoint_url, {
        method: 'POST',
        headers: {
            'x-file-name': 'request_upload_transfer.jpg',
        },
        body: fs.createReadStream(test_file_path),
    });

    // Validate the hash uploaded on the server side with the expected hash from client side
    const chunked_upload_body = await chunked_upload_response.json();
    assert_log(
        group,
        `${candidate} Piped Upload With Chunked Encoding Transfer - ${expected_hash} === ${chunked_upload_body.hash} - ${test_file_stats.size} bytes`,
        () => expected_hash === chunked_upload_body.hash
    );
}

module.exports = {
    test_request_stream_pipe,
};

In the test case above, the buffer_upload_response fetch tests the streaming by sending a Buffer of the test file which works as expected since a content-length header is sent along with the fetch request. But the chunked_upload_response fetch request throws an FetchError: request to http://127.0.0.1:8080/tests/request/stream-pipe failed, reason: write ECONNRESET error signifying that the server abruptly closed the connection even though the server does not crash. I have provided the console logs below so you can get more context for request headers and body chunks being received on the server side in both fetch requests.

Logs:

[12/19/2021 12:51:06:379ms PM][TESTING] Successfully Started HyperExpress HTTP Server @ 127.0.0.1:8080
[12/19/2021 12:51:06:492ms PM][REQUEST] Testing HyperExpress.Request Object...
[12/19/2021 12:51:06:494ms PM][REQUEST] Generating A Large 10,485,760 Characters Size Body To Simulate Too-Large Large Payload...
Raw Chunk -> 31
EOF Chunk ->
[12/19/2021 12:51:06:540ms PM][REQUEST] Verified Too Large Body 413 HTTP Code Reject
Raw Chunk -> 65086
Raw Chunk -> 65536
Raw Chunk -> 131072
Raw Chunk -> 262144
Raw Chunk -> 524288
Raw Chunk -> 458793
Raw Chunk -> 524288
Raw Chunk -> 524288
Raw Chunk -> 64798
Raw Chunk -> 524288
Raw Chunk -> 523490
Raw Chunk -> 524288
Raw Chunk -> 524288
Raw Chunk -> 524288
Raw Chunk -> 1962
EOF Chunk ->
[12/19/2021 12:51:06:950ms PM][REQUEST] Verified Middleware Execution & Timing Test
[12/19/2021 12:51:06:951ms PM][REQUEST] Verified Middleware Property Binding Test
[12/19/2021 12:51:06:951ms PM][REQUEST] Verified Route Specific Middleware Avoidance Test
[12/19/2021 12:51:07:208ms PM][REQUEST] Verified Route Specific Middleware Binding & Property Test
[12/19/2021 12:51:07:210ms PM][REQUEST] Verified HyperExpress.Request Middleware Double Iteration Violation
[12/19/2021 12:51:07:211ms PM][REQUEST] Verified HyperExpress.Request Middleware Thrown Iteration Error Handler
[12/19/2021 12:51:07:212ms PM][REQUEST] Verified HyperExpress.Request.method
[12/19/2021 12:51:07:212ms PM][REQUEST] Verified HyperExpress.Request.url
[12/19/2021 12:51:07:212ms PM][REQUEST] Verified HyperExpress.Request.path
[12/19/2021 12:51:07:212ms PM][REQUEST] Verified HyperExpress.Request.query
[12/19/2021 12:51:07:212ms PM][REQUEST] Verified HyperExpress.Request.ip
[12/19/2021 12:51:07:213ms PM][REQUEST] Verified HyperExpress.Request.proxy_ip
[12/19/2021 12:51:07:213ms PM][REQUEST] Verified HyperExpress.Request.headers["x-test-value", "cookie", "content-length"]
[12/19/2021 12:51:07:213ms PM][REQUEST] Verified HyperExpress.Request.query_parameters
[12/19/2021 12:51:07:213ms PM][REQUEST] Verified HyperExpress.Request.path_parameters
[12/19/2021 12:51:07:213ms PM][REQUEST] Verified HyperExpress.Request.cookies
[12/19/2021 12:51:07:213ms PM][REQUEST] Verified HyperExpress.Request.body
{
  'x-file-name': 'request_upload_buffer.jpg',
  accept: '*/*',
  'content-length': '879067',
  'user-agent': 'node-fetch/1.0 (+https://github.com/bitinn/node-fetch)',
  'accept-encoding': 'gzip,deflate',
  connection: 'close',
  host: '127.0.0.1:8080'
}
Copied Chunk -> 65234
Copied Chunk -> 524288
Paused Request!
Resumed Request!
Copied Chunk -> 289545
Paused Request!
EOF Chunk ->
Resumed Request!
[12/19/2021 12:51:07:222ms PM][REQUEST] Verified HyperExpress.Request.stream Piped Upload With Content Length - b65f7a483ee36dacdf5a8896b19937f8 === b65f7a483ee36dacdf5a8896b19937f8 - 879067 bytes
{
  'x-file-name': 'request_upload_transfer.jpg',
  accept: '*/*',
  'user-agent': 'node-fetch/1.0 (+https://github.com/bitinn/node-fetch)',
  'accept-encoding': 'gzip,deflate',
  connection: 'close',
  host: '127.0.0.1:8080',
  'transfer-encoding': 'chunked'
}
Copied Chunk -> 0
EOF Chunk ->
FetchError: request to http://127.0.0.1:8080/tests/request/stream-pipe failed, reason: write ECONNRESET
    at ClientRequest.<anonymous> (F:\NPM Packages\hyper-express\tests\node_modules\node-fetch\lib\index.js:1483:11)
    at ClientRequest.emit (node:events:406:35)
    at Socket.socketErrorListener (node:_http_client:447:9)
    at Socket.emit (node:events:394:28)
    at emitErrorNT (node:internal/streams/destroy:157:8)
    at emitErrorCloseNT (node:internal/streams/destroy:122:3)
    at processTicksAndRejections (node:internal/process/task_queues:83:21) {
  type: 'system',
  errno: 'ECONNRESET',
  code: 'ECONNRESET'
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions