Skip to content

Commit

Permalink
Merge pull request #13453 from zhengjitf/fix-11601
Browse files Browse the repository at this point in the history
fix(core): possible memory leak when using server side events
  • Loading branch information
kamilmysliwiec authored Jun 3, 2024
2 parents e3a47b6 + f797e16 commit 945f7a0
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 3 deletions.
7 changes: 5 additions & 2 deletions packages/core/router/router-response-controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import {
import { isObject } from '@nestjs/common/utils/shared.utils';
import { IncomingMessage } from 'http';
import { EMPTY, lastValueFrom, Observable, isObservable } from 'rxjs';
import { catchError, debounce, map } from 'rxjs/operators';
import { catchError, concatMap, map } from 'rxjs/operators';
import {
AdditionalHeaders,
WritableHeaderStream,
Expand Down Expand Up @@ -128,7 +128,7 @@ export class RouterResponseController {

return { data: message as object | string };
}),
debounce(
concatMap(
message =>
new Promise<void>(resolve =>
stream.writeMessage(message, () => resolve()),
Expand All @@ -153,6 +153,9 @@ export class RouterResponseController {

request.on('close', () => {
subscription.unsubscribe();
if (!stream.writableEnded) {
stream.end();
}
});
}

Expand Down
2 changes: 1 addition & 1 deletion packages/core/router/sse-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ export class SseStream extends Transform {
message.id = this.lastEventId.toString();
}

if (!this.write(message, 'utf-8', cb)) {
if (!this.write(message, 'utf-8')) {
this.once('drain', cb);
} else {
process.nextTick(cb);
Expand Down
66 changes: 66 additions & 0 deletions packages/core/test/router/router-response-controller.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { PassThrough, Writable } from 'stream';
import { HttpStatus, RequestMethod } from '../../../common';
import { RouterResponseController } from '../../router/router-response-controller';
import { NoopHttpAdapter } from '../utils/noop-adapter.spec';
import { SseStream } from '../../router/sse-stream';

describe('RouterResponseController', () => {
let adapter: NoopHttpAdapter;
Expand Down Expand Up @@ -374,6 +375,71 @@ data: test
done();
});

describe('when writing data too densely', () => {
const DEFAULT_MAX_LISTENERS = SseStream.defaultMaxListeners;
const MAX_LISTENERS = 1;
const sandbox = sinon.createSandbox();

beforeEach(() => {
// Can't access to the internal sseStream,
// as a workround, set `defaultMaxListeners` of `SseStream` and reset the max listeners of `process`
const PROCESS_MAX_LISTENERS = process.getMaxListeners();
SseStream.defaultMaxListeners = MAX_LISTENERS;
process.setMaxListeners(PROCESS_MAX_LISTENERS);

const sseStream = sinon.createStubInstance(SseStream);
const originalWrite = SseStream.prototype.write;
// Make `.write()` always return false, so as to listen `drain` event
sseStream.write.callsFake(function (...args: any[]) {
originalWrite.apply(this, args);
return false;
});
sandbox.replace(SseStream.prototype, 'write', sseStream.write);
});

afterEach(() => {
sandbox.restore();
SseStream.defaultMaxListeners = DEFAULT_MAX_LISTENERS;
});

it('should not cause memory leak', async () => {
let maxDrainListenersExceededWarning = null;
process.on('warning', (warning: any) => {
if (
warning.name === 'MaxListenersExceededWarning' &&
warning.emitter instanceof SseStream &&
warning.type === 'drain' &&
warning.count === MAX_LISTENERS + 1
) {
maxDrainListenersExceededWarning = warning;
}
});

const result = new Subject();

const response = new Writable();
response._write = () => {};

const request = new Writable();
request._write = () => {};

routerResponseController.sse(
result,
response as unknown as ServerResponse,
request as unknown as IncomingMessage,
);

// Send multiple messages simultaneously
Array.from({ length: MAX_LISTENERS + 1 }).forEach((_, i) =>
result.next(String(i)),
);

await new Promise(resolve => process.nextTick(resolve));

expect(maxDrainListenersExceededWarning).to.equal(null);
});
});

describe('when there is an error', () => {
it('should close the request', done => {
const result = new Subject();
Expand Down

0 comments on commit 945f7a0

Please sign in to comment.