1
1
import { Subject } from '../Subject' ;
2
+ import { Operator } from '../Operator' ;
3
+ import { Observer } from '../Observer' ;
2
4
import { Observable } from '../Observable' ;
3
5
import { Subscriber } from '../Subscriber' ;
4
6
import { Subscription } from '../Subscription' ;
@@ -8,8 +10,9 @@ import {Subscription} from '../Subscription';
8
10
*/
9
11
export class ConnectableObservable < T > extends Observable < T > {
10
12
11
- protected subject : Subject < T > ;
12
- protected subscription : Subscription ;
13
+ protected _subject : Subject < T > ;
14
+ protected _refCount : number = 0 ;
15
+ protected _connection : Subscription ;
13
16
14
17
constructor ( protected source : Observable < T > ,
15
18
protected subjectFactory : ( ) => Subject < T > ) {
@@ -20,133 +23,106 @@ export class ConnectableObservable<T> extends Observable<T> {
20
23
return this . getSubject ( ) . subscribe ( subscriber ) ;
21
24
}
22
25
23
- protected getSubject ( ) {
24
- const subject = this . subject ;
25
- if ( subject && ! subject . isUnsubscribed ) {
26
- return subject ;
27
- }
28
- return ( this . subject = this . subjectFactory ( ) ) ;
26
+ protected getSubject ( ) : Subject < T > {
27
+ return this . _subject || ( this . _subject = this . subjectFactory ( ) ) ;
29
28
}
30
29
31
30
connect ( ) : Subscription {
32
- const source = this . source ;
33
- let subscription = this . subscription ;
34
- if ( subscription && ! subscription . isUnsubscribed ) {
35
- return subscription ;
31
+ let connection = this . _connection ;
32
+ if ( ! connection ) {
33
+ connection = this . source . subscribe ( new ConnectableSubscriber ( this . getSubject ( ) , this ) ) ;
34
+ if ( connection . isUnsubscribed ) {
35
+ this . _connection = null ;
36
+ connection = Subscription . EMPTY ;
37
+ } else {
38
+ this . _connection = connection ;
39
+ }
36
40
}
37
- subscription = source . subscribe ( this . getSubject ( ) ) ;
38
- subscription . add ( new ConnectableSubscription ( this ) ) ;
39
- return ( this . subscription = subscription ) ;
41
+ return connection ;
40
42
}
41
43
42
44
refCount ( ) : Observable < T > {
43
- return new RefCountObservable ( this ) ;
44
- }
45
-
46
- /**
47
- * This method is opened for `ConnectableSubscription`.
48
- * Not to call from others.
49
- */
50
- _closeSubscription ( ) : void {
51
- this . subject = null ;
52
- this . subscription = null ;
45
+ return this . lift ( new RefCountOperator < T > ( this ) ) ;
53
46
}
54
47
}
55
48
56
- /**
57
- * We need this JSDoc comment for affecting ESDoc.
58
- * @ignore
59
- * @extends {Ignored }
60
- */
61
- class ConnectableSubscription extends Subscription {
62
- constructor ( protected connectable : ConnectableObservable < any > ) {
63
- super ( ) ;
49
+ class ConnectableSubscriber < T > extends Subscriber < T > {
50
+ constructor ( destination : Observer < T > ,
51
+ private connectable : ConnectableObservable < T > ) {
52
+ super ( destination ) ;
53
+ }
54
+ protected _error ( err : any ) : void {
55
+ this . _unsubscribe ( ) ;
56
+ super . _error ( err ) ;
57
+ }
58
+ protected _complete ( ) : void {
59
+ this . _unsubscribe ( ) ;
60
+ super . _complete ( ) ;
64
61
}
65
-
66
62
protected _unsubscribe ( ) {
67
- const connectable = this . connectable ;
68
- connectable . _closeSubscription ( ) ;
69
- this . connectable = null ;
63
+ const { connectable } = this ;
64
+ if ( connectable ) {
65
+ this . connectable = null ;
66
+ ( < any > connectable ) . _refCount = 0 ;
67
+ ( < any > connectable ) . _subject = null ;
68
+ ( < any > connectable ) . _connection = null ;
69
+ }
70
70
}
71
71
}
72
72
73
- /**
74
- * We need this JSDoc comment for affecting ESDoc.
75
- * @ignore
76
- * @extends {Ignored }
77
- */
78
- class RefCountObservable < T > extends Observable < T > {
79
- connection : Subscription ;
80
-
81
- constructor ( protected connectable : ConnectableObservable < T > ,
82
- public refCount : number = 0 ) {
83
- super ( ) ;
73
+ class RefCountOperator < T > implements Operator < T , T > {
74
+ constructor ( private connectable : ConnectableObservable < T > ) {
84
75
}
76
+ call ( subscriber : Subscriber < T > , source : any ) : any {
85
77
86
- protected _subscribe ( subscriber : Subscriber < T > ) {
87
- const connectable = this . connectable ;
88
- const refCountSubscriber : RefCountSubscriber < T > = new RefCountSubscriber ( subscriber , this ) ;
89
- const subscription = connectable . subscribe ( refCountSubscriber ) ;
90
- if ( ! subscription . isUnsubscribed && ++ this . refCount === 1 ) {
91
- refCountSubscriber . connection = this . connection = connectable . connect ( ) ;
78
+ const { connectable } = this ;
79
+ ( < any > connectable ) . _refCount ++ ;
80
+
81
+ const refCounter = new RefCountSubscriber ( subscriber , connectable ) ;
82
+ const subscription = source . _subscribe ( refCounter ) ;
83
+
84
+ if ( ! refCounter . isUnsubscribed ) {
85
+ ( < any > refCounter ) . connection = connectable . connect ( ) ;
92
86
}
87
+
93
88
return subscription ;
94
89
}
95
90
}
96
91
97
- /**
98
- * We need this JSDoc comment for affecting ESDoc.
99
- * @ignore
100
- * @extends {Ignored }
101
- */
102
92
class RefCountSubscriber < T > extends Subscriber < T > {
103
- connection : Subscription ;
104
93
105
- constructor ( public destination : Subscriber < T > ,
106
- private refCountObservable : RefCountObservable < T > ) {
107
- super ( null ) ;
108
- this . connection = refCountObservable . connection ;
109
- destination . add ( this ) ;
110
- }
94
+ private connection : Subscription ;
111
95
112
- protected _next ( value : T ) {
113
- this . destination . next ( value ) ;
96
+ constructor ( destination : Subscriber < T > ,
97
+ private connectable : ConnectableObservable < T > ) {
98
+ super ( destination ) ;
114
99
}
115
100
116
- protected _error ( err : any ) {
117
- this . _resetConnectable ( ) ;
118
- this . destination . error ( err ) ;
119
- }
101
+ protected _unsubscribe ( ) {
120
102
121
- protected _complete ( ) {
122
- this . _resetConnectable ( ) ;
123
- this . destination . complete ( ) ;
124
- }
103
+ const { connectable } = this ;
104
+ if ( ! connectable ) {
105
+ this . connection = null ;
106
+ return ;
107
+ }
125
108
126
- private _resetConnectable ( ) {
127
- const observable = this . refCountObservable ;
128
- const obsConnection = observable . connection ;
129
- const subConnection = this . connection ;
130
- if ( subConnection && subConnection === obsConnection ) {
131
- observable . refCount = 0 ;
132
- obsConnection . unsubscribe ( ) ;
133
- observable . connection = null ;
134
- this . unsubscribe ( ) ;
109
+ this . connectable = null ;
110
+ const refCount = ( < any > connectable ) . _refCount ;
111
+ if ( refCount <= 0 ) {
112
+ this . connection = null ;
113
+ return ;
135
114
}
136
- }
137
115
138
- protected _unsubscribe ( ) {
139
- const observable = this . refCountObservable ;
140
- if ( observable . refCount === 0 ) {
116
+ ( < any > connectable ) . _refCount = refCount - 1 ;
117
+ if ( refCount > 1 ) {
118
+ this . connection = null ;
141
119
return ;
142
120
}
143
- if ( -- observable . refCount === 0 ) {
144
- const obsConnection = observable . connection ;
145
- const subConnection = this . connection ;
146
- if ( subConnection && subConnection === obsConnection ) {
147
- obsConnection . unsubscribe ( ) ;
148
- observable . connection = null ;
149
- }
121
+
122
+ const { connection } = this ;
123
+ if ( connection ) {
124
+ this . connection = null ;
125
+ connection . unsubscribe ( ) ;
150
126
}
151
127
}
152
128
}
0 commit comments