Skip to content

Commit

Permalink
fix(stream): premature close when it is paused (sidorares#2416)
Browse files Browse the repository at this point in the history
* Fix premature close

* Add test for premature close of the stream when it is paused

* Restore package-lock.json
  • Loading branch information
jansivans authored Jan 31, 2024
1 parent 35c61d1 commit 7c6bc64
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 5 deletions.
12 changes: 7 additions & 5 deletions lib/commands/query.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class Query extends Command {
this._receivedFieldsCount = 0;
this._resultIndex = 0;
this._localStream = null;
this._unpipeStream = function() {};
this._unpipeStream = function () { };
this._streamFactory = options.infileStreamFactory;
this._connection = null;
}
Expand Down Expand Up @@ -155,7 +155,7 @@ class Query extends Command {
const onPause = () => {
this._localStream.pause();
};
const onData = function(data) {
const onData = function (data) {
const dataWithHeader = Buffer.allocUnsafe(data.length + 4);
data.copy(dataWithHeader, 4);
connection.writePacket(
Expand Down Expand Up @@ -227,7 +227,7 @@ class Query extends Command {
}

/* eslint no-unused-vars: ["error", { "argsIgnorePattern": "^_" }] */
row(packet, _connection) {
row(packet, _connection) {
if (packet.isEOF()) {
const status = packet.eofStatusFlags();
const moreResults = status & ServerStatus.SERVER_MORE_RESULTS_EXISTS;
Expand Down Expand Up @@ -279,11 +279,13 @@ class Query extends Command {
});
this.on('end', () => {
stream.push(null); // pushing null, indicating EOF
setImmediate(() => stream.emit('close')); // notify readers that query has completed
});
this.on('fields', fields => {
stream.emit('fields', fields); // replicate old emitter
});
stream.on('end', () => {
stream.emit('close');
});
return stream;
}

Expand All @@ -302,7 +304,7 @@ class Query extends Command {
Timers.clearTimeout(this.queryTimeout);
this.queryTimeout = null;
}

const err = new Error('Query inactivity timeout');
err.errorno = 'PROTOCOL_SEQUENCE_TIMEOUT';
err.code = 'PROTOCOL_SEQUENCE_TIMEOUT';
Expand Down
7 changes: 7 additions & 0 deletions test/integration/connection/test-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ let rows;
const rows1 = [];
const rows2 = [];
const rows3 = [];
const rows4 = [];

connection.query(
[
Expand Down Expand Up @@ -65,11 +66,17 @@ connection.execute('SELECT * FROM announcements', async (err, _rows) => {
for await (const row of s3) {
rows3.push(row);
}
const s4 = connection.query('SELECT * FROM announcements').stream();
for await (const row of s4) {
await new Promise(resolve => setTimeout(resolve, 1000));
rows4.push(row);
}
});

process.on('exit', () => {
assert.deepEqual(rows.length, 2);
assert.deepEqual(rows, rows1);
assert.deepEqual(rows, rows2);
assert.deepEqual(rows, rows3);
assert.deepEqual(rows, rows4);
});

0 comments on commit 7c6bc64

Please sign in to comment.