@@ -436,5 +436,56 @@ describe('Observable.webSocket', () => {
436
436
437
437
expect ( results ) . to . deep . equal ( [ 'sub' , 'unsub' , 'sub' , error , 'unsub' ] ) ;
438
438
} ) ;
439
+
440
+ it ( 'should not close the socket until all subscriptions complete' , ( ) => {
441
+ const socketSubject = Rx . Observable . webSocket ( < any > { url : 'ws://mysocket' } ) ;
442
+ const results = [ ] ;
443
+ const socketMessages = [
444
+ { id : 'A' } ,
445
+ { id : 'B' } ,
446
+ { id : 'A' , complete : true } ,
447
+ { id : 'B' } ,
448
+ { id : 'B' , complete : true } ,
449
+ ] ;
450
+
451
+ socketSubject . multiplex (
452
+ ( ) => 'no-op' ,
453
+ ( ) => results . push ( 'A unsub' ) ,
454
+ ( req : any ) => req . id === 'A' )
455
+ . takeWhile ( ( req : any ) => ! req . complete )
456
+ . subscribe (
457
+ ( ) => results . push ( 'A next' ) ,
458
+ ( e ) => results . push ( 'A error ' + e ) ,
459
+ ( ) => results . push ( 'A complete' )
460
+ ) ;
461
+
462
+ socketSubject . multiplex (
463
+ ( ) => 'no-op' ,
464
+ ( ) => results . push ( 'B unsub' ) ,
465
+ ( req : any ) => req . id === 'B' )
466
+ . takeWhile ( ( req : any ) => ! req . complete )
467
+ . subscribe (
468
+ ( ) => results . push ( 'B next' ) ,
469
+ ( e ) => results . push ( 'B error ' + e ) ,
470
+ ( ) => results . push ( 'B complete' )
471
+ ) ;
472
+
473
+ // Setup socket and send messages
474
+ let socket = MockWebSocket . lastSocket ;
475
+ socket . open ( ) ;
476
+ socketMessages . forEach ( ( msg ) => {
477
+ socket . triggerMessage ( JSON . stringify ( msg ) ) ;
478
+ } ) ;
479
+
480
+ expect ( results ) . to . deep . equal ( [
481
+ 'A next' ,
482
+ 'B next' ,
483
+ 'A complete' ,
484
+ 'A unsub' ,
485
+ 'B next' ,
486
+ 'B complete' ,
487
+ 'B unsub' ,
488
+ ] ) ;
489
+ } ) ;
439
490
} ) ;
440
- } ) ;
491
+ } ) ;
0 commit comments