Skip to content

Commit a5e9cfe

Browse files
benleshjayphelps
authored andcommitted
fix(WebSocketSubject.prototype.multiplex): no longer nulls out socket after first unsubscribe (#2039)
This fix ensure the observers count goes to zero before the state is reset on the WebSocketSubject instance fixes #2037
1 parent 76a9abb commit a5e9cfe

File tree

2 files changed

+60
-3
lines changed

2 files changed

+60
-3
lines changed

spec/observables/dom/webSocket-spec.ts

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -507,6 +507,61 @@ describe('Observable.webSocket', () => {
507507
(<any>socket.close).restore();
508508
});
509509

510+
it('should keep the same socket for multiple multiplex subscriptions', () => {
511+
const socketSubject = Rx.Observable.webSocket(<any>{url: 'ws://mysocket'});
512+
const results = [];
513+
const socketMessages = [
514+
{id: 'A'},
515+
{id: 'B'},
516+
{id: 'A'},
517+
{id: 'B'},
518+
{id: 'B'},
519+
];
520+
521+
const sub1 = socketSubject.multiplex(
522+
() => 'no-op',
523+
() => results.push('A unsub'),
524+
(req: any) => req.id === 'A')
525+
.takeWhile((req: any) => !req.complete)
526+
.subscribe(
527+
() => results.push('A next'),
528+
(e) => results.push('A error ' + e),
529+
() => results.push('A complete')
530+
);
531+
532+
socketSubject.multiplex(
533+
() => 'no-op',
534+
() => results.push('B unsub'),
535+
(req: any) => req.id === 'B')
536+
.subscribe(
537+
() => results.push('B next'),
538+
(e) => results.push('B error ' + e),
539+
() => results.push('B complete')
540+
);
541+
542+
// Setup socket and send messages
543+
let socket = MockWebSocket.lastSocket;
544+
socket.open();
545+
socketMessages.forEach((msg, i) => {
546+
if (i === 1) {
547+
sub1.unsubscribe();
548+
expect(socketSubject.socket).to.equal(socket);
549+
}
550+
socket.triggerMessage(JSON.stringify(msg));
551+
});
552+
socket.triggerClose({ wasClean: true });
553+
554+
expect(results).to.deep.equal([
555+
'A next',
556+
'A unsub',
557+
'B next',
558+
'B next',
559+
'B next',
560+
'B complete',
561+
'B unsub',
562+
]);
563+
});
564+
510565
it('should not close the socket until all subscriptions complete', () => {
511566
const socketSubject = Rx.Observable.webSocket(<any>{url: 'ws://mysocket'});
512567
const results = [];

src/observable/dom/WebSocketSubject.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -220,10 +220,12 @@ export class WebSocketSubject<T> extends AnonymousSubject<T> {
220220
subscription.add(this._output.subscribe(subscriber));
221221
subscription.add(() => {
222222
const { socket } = this;
223-
if (this._output.observers.length === 0 && socket && socket.readyState === 1) {
224-
socket.close();
223+
if (this._output.observers.length === 0) {
224+
if (socket && socket.readyState === 1) {
225+
socket.close();
226+
}
227+
this._resetState();
225228
}
226-
this._resetState();
227229
});
228230
return subscription;
229231
}

0 commit comments

Comments
 (0)