@@ -65,8 +65,38 @@ export class WebSocketSubject<T> extends Subject<T> {
65
65
return sock ;
66
66
}
67
67
68
- multiplex ( subMsg : any , unsubMsg : any , messageFilter : ( value : T ) => boolean ) {
69
- return this . lift ( new MultiplexOperator ( this , subMsg , unsubMsg , messageFilter ) ) ;
68
+ // TODO: factor this out to be a proper Operator/Subscriber implementation and eliminate closures
69
+ multiplex ( subMsg : ( ) => any , unsubMsg : ( ) => any , messageFilter : ( value : T ) => boolean ) {
70
+ const self = this ;
71
+ return new Observable ( observer => {
72
+ const result = tryCatch ( subMsg ) ( ) ;
73
+ if ( result === errorObject ) {
74
+ observer . error ( errorObject . e ) ;
75
+ } else {
76
+ self . next ( result ) ;
77
+ }
78
+
79
+ const subscription = self . subscribe ( x => {
80
+ const result = tryCatch ( messageFilter ) ( x ) ;
81
+ if ( result === errorObject ) {
82
+ observer . error ( errorObject . e ) ;
83
+ } else if ( result ) {
84
+ observer . next ( x ) ;
85
+ }
86
+ } ,
87
+ err => observer . error ( err ) ,
88
+ ( ) => observer . complete ( ) ) ;
89
+
90
+ return ( ) => {
91
+ const result = tryCatch ( unsubMsg ) ( ) ;
92
+ if ( result === errorObject ) {
93
+ observer . error ( errorObject . e ) ;
94
+ } else {
95
+ self . next ( result ) ;
96
+ }
97
+ subscription . unsubscribe ( ) ;
98
+ } ;
99
+ } ) ;
70
100
}
71
101
72
102
_unsubscribe ( ) {
@@ -157,12 +187,11 @@ export class WebSocketSubject<T> extends Subject<T> {
157
187
self . _finalNext ( result ) ;
158
188
}
159
189
} ;
160
- return subscription ;
161
190
}
162
191
163
192
return new Subscription ( ( ) => {
164
193
subscription . unsubscribe ( ) ;
165
- if ( this . observers . length === 0 ) {
194
+ if ( ! this . observers || this . observers . length === 0 ) {
166
195
const { socket } = this ;
167
196
if ( socket && socket . readyState < 2 ) {
168
197
socket . close ( ) ;
@@ -173,43 +202,4 @@ export class WebSocketSubject<T> extends Subject<T> {
173
202
}
174
203
} ) ;
175
204
}
176
- }
177
-
178
- export class MultiplexOperator < T , R > implements Operator < T , R > {
179
- constructor ( private socketSubject : WebSocketSubject < T > ,
180
- private subscribeMessage : any ,
181
- private unsubscribeMessage ,
182
- private messageFilter : ( data : any ) => R ) {
183
- // noop
184
- }
185
-
186
- call ( subscriber : Subscriber < R > ) {
187
- return new MultiplexSubscriber ( subscriber , this . socketSubject , this . subscribeMessage , this . unsubscribeMessage , this . messageFilter ) ;
188
- }
189
- }
190
-
191
- export class MultiplexSubscriber < T > extends Subscriber < T > {
192
- constructor ( destination : Observer < T > ,
193
- private socketSubject : WebSocketSubject < any > ,
194
- private subscribeMessage : any ,
195
- private unsubscribeMessage : any ,
196
- private messageFilter : ( data : any ) => T ) {
197
- super ( destination ) ;
198
-
199
- socketSubject . next ( subscribeMessage ) ;
200
- }
201
-
202
- next ( value : any ) {
203
- const pass = tryCatch ( this . messageFilter ) ( value ) ;
204
- if ( pass === errorObject ) {
205
- this . destination . error ( errorObject . e ) ;
206
- } else if ( pass ) {
207
- this . destination . next ( value ) ;
208
- }
209
- }
210
-
211
- unsubscribe ( ) {
212
- this . socketSubject . next ( this . unsubscribeMessage ) ;
213
- super . unsubscribe ( ) ;
214
- }
215
205
}
0 commit comments