Skip to content

Commit 904d617

Browse files
committed
feat(multiplex): add multiplex operator to WebSocketSubject
- fix to ensure expected web socket close behavior
1 parent 580f69a commit 904d617

File tree

2 files changed

+72
-43
lines changed

2 files changed

+72
-43
lines changed

spec/observables/dom/webSocket-spec.js

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,45 @@ describe('Observable.webSocket', function () {
303303
expect(closes[1]).toBe(expected[1]);
304304
});
305305
});
306+
307+
describe('multiplex', function () {
308+
it('should multiplex over the websocket', function () {
309+
var results = [];
310+
var subject = Observable.webSocket('ws://websocket');
311+
var source = subject.multiplex(function () {
312+
return { sub: 'foo'};
313+
}, function () {
314+
return { unsub: 'foo' };
315+
}, function (value) {
316+
return value.name === 'foo';
317+
});
318+
319+
var sub = source.subscribe(function (x) {
320+
results.push(x.value);
321+
});
322+
var socket = MockWebSocket.lastSocket();
323+
socket.open();
324+
325+
expect(socket.lastMessageSent()).toEqual({ sub: 'foo' });
326+
327+
[1, 2, 3, 4, 5].map(function (x) {
328+
return {
329+
name: x % 3 === 0 ? 'bar' : 'foo',
330+
value: x
331+
};
332+
}).forEach(function (x) {
333+
socket.triggerMessage(JSON.stringify(x));
334+
});
335+
336+
expect(results).toEqual([1, 2, 4, 5]);
337+
338+
spyOn(socket, 'close').and.callThrough();
339+
sub.unsubscribe();
340+
expect(socket.lastMessageSent()).toEqual({ unsub: 'foo' });
341+
342+
expect(socket.close).toHaveBeenCalled();
343+
});
344+
});
306345
});
307346

308347
var sockets = [];

src/observable/dom/webSocket.ts

Lines changed: 33 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,38 @@ export class WebSocketSubject<T> extends Subject<T> {
6565
return sock;
6666
}
6767

68-
multiplex(subMsg: any, unsubMsg: any, messageFilter: (value: T) => boolean) {
69-
return this.lift(new MultiplexOperator(this, subMsg, unsubMsg, messageFilter));
68+
// TODO: factor this out to be a proper Operator/Subscriber implementation and eliminate closures
69+
multiplex(subMsg: () => any, unsubMsg: () => any, messageFilter: (value: T) => boolean) {
70+
const self = this;
71+
return new Observable(observer => {
72+
const result = tryCatch(subMsg)();
73+
if (result === errorObject) {
74+
observer.error(errorObject.e);
75+
} else {
76+
self.next(result);
77+
}
78+
79+
const subscription = self.subscribe(x => {
80+
const result = tryCatch(messageFilter)(x);
81+
if (result === errorObject) {
82+
observer.error(errorObject.e);
83+
} else if (result) {
84+
observer.next(x);
85+
}
86+
},
87+
err => observer.error(err),
88+
() => observer.complete());
89+
90+
return () => {
91+
const result = tryCatch(unsubMsg)();
92+
if (result === errorObject) {
93+
observer.error(errorObject.e);
94+
} else {
95+
self.next(result);
96+
}
97+
subscription.unsubscribe();
98+
};
99+
});
70100
}
71101

72102
_unsubscribe() {
@@ -157,12 +187,11 @@ export class WebSocketSubject<T> extends Subject<T> {
157187
self._finalNext(result);
158188
}
159189
};
160-
return subscription;
161190
}
162191

163192
return new Subscription(() => {
164193
subscription.unsubscribe();
165-
if (this.observers.length === 0) {
194+
if (!this.observers || this.observers.length === 0) {
166195
const { socket } = this;
167196
if (socket && socket.readyState < 2) {
168197
socket.close();
@@ -173,43 +202,4 @@ export class WebSocketSubject<T> extends Subject<T> {
173202
}
174203
});
175204
}
176-
}
177-
178-
export class MultiplexOperator<T, R> implements Operator<T, R> {
179-
constructor(private socketSubject: WebSocketSubject<T>,
180-
private subscribeMessage: any,
181-
private unsubscribeMessage,
182-
private messageFilter: (data: any) => R) {
183-
// noop
184-
}
185-
186-
call(subscriber: Subscriber<R>) {
187-
return new MultiplexSubscriber(subscriber, this.socketSubject, this.subscribeMessage, this.unsubscribeMessage, this.messageFilter);
188-
}
189-
}
190-
191-
export class MultiplexSubscriber<T> extends Subscriber<T> {
192-
constructor(destination: Observer<T>,
193-
private socketSubject: WebSocketSubject<any>,
194-
private subscribeMessage: any,
195-
private unsubscribeMessage: any,
196-
private messageFilter: (data: any) => T) {
197-
super(destination);
198-
199-
socketSubject.next(subscribeMessage);
200-
}
201-
202-
next(value: any) {
203-
const pass = tryCatch(this.messageFilter)(value);
204-
if (pass === errorObject) {
205-
this.destination.error(errorObject.e);
206-
} else if (pass) {
207-
this.destination.next(value);
208-
}
209-
}
210-
211-
unsubscribe() {
212-
this.socketSubject.next(this.unsubscribeMessage);
213-
super.unsubscribe();
214-
}
215205
}

0 commit comments

Comments
 (0)