1
1
import Operator from '../Operator' ;
2
2
import Observer from '../Observer' ;
3
+ import Subscription from '../Subscription' ;
3
4
import Subscriber from '../Subscriber' ;
4
5
import Observable from '../Observable' ;
5
6
import Subject from '../Subject' ;
6
7
import Map from '../util/Map' ;
7
8
import FastMap from '../util/FastMap' ;
8
- import GroupSubject from '../subjects/GroupSubject ' ;
9
+ import { RefCountSubscription , GroupedObservable , InnerRefCountSubscription } from './groupBy-support ' ;
9
10
10
11
import tryCatch from '../util/tryCatch' ;
11
12
import { errorObject } from '../util/errorObject' ;
12
13
import bindCallback from '../util/bindCallback' ;
13
14
14
- export default function groupBy < T , R > ( keySelector : ( value : T ) => string ,
15
- elementSelector ?: ( value : T ) => R ,
16
- durationSelector ?: ( grouped : GroupSubject < R > ) => Observable < any > ) : Observable < GroupSubject < R > > {
17
- return this . lift ( new GroupByOperator < T , R > ( keySelector , durationSelector , elementSelector ) ) ;
15
+ export function groupBy < T , R > ( keySelector : ( value : T ) => string ,
16
+ elementSelector ?: ( value : T ) => R ,
17
+ durationSelector ?: ( grouped : GroupedObservable < R > ) => Observable < any > ) : GroupByObservable < T , R > {
18
+ return new GroupByObservable < T , R > ( this , keySelector , elementSelector , durationSelector ) ;
18
19
}
19
20
20
- class GroupByOperator < T , R > implements Operator < T , R > {
21
- constructor ( private keySelector : ( value : T ) => string ,
22
- private durationSelector ?: ( grouped : GroupSubject < R > ) => Observable < any > ,
23
- private elementSelector ?: ( value : T ) => R ) {
21
+ export class GroupByObservable < T , R > extends Observable < GroupedObservable < R > > {
22
+ constructor ( public source : Observable < T > ,
23
+ private keySelector : ( value : T ) => string ,
24
+ private elementSelector ?: ( value : T ) => R ,
25
+ private durationSelector ?: ( grouped : GroupedObservable < R > ) => Observable < any > ) {
26
+ super ( ) ;
24
27
}
25
28
26
- call ( subscriber : Subscriber < R > ) : Subscriber < T > {
27
- return new GroupBySubscriber < T , R > (
28
- subscriber , this . keySelector , this . durationSelector , this . elementSelector
29
+ _subscribe ( subscriber ) {
30
+ const refCountSubscription = new RefCountSubscription ( ) ;
31
+ const groupBySubscriber = new GroupBySubscriber (
32
+ subscriber , refCountSubscription , this . keySelector , this . elementSelector , this . durationSelector
29
33
) ;
34
+ refCountSubscription . setPrimary ( this . source . subscribe ( groupBySubscriber ) ) ;
35
+ return refCountSubscription ;
30
36
}
31
37
}
32
38
33
39
class GroupBySubscriber < T , R > extends Subscriber < T > {
34
40
private groups = null ;
35
41
36
42
constructor ( destination : Subscriber < R > ,
43
+ private refCountSubscription : RefCountSubscription < T > ,
37
44
private keySelector : ( value : T ) => string ,
38
- private durationSelector ?: ( grouped : GroupSubject < R > ) => Observable < any > ,
39
- private elementSelector ?: ( value : T ) => R ) {
40
- super ( destination ) ;
45
+ private elementSelector ?: ( value : T ) => R ,
46
+ private durationSelector ?: ( grouped : GroupedObservable < R > ) => Observable < any > ) {
47
+ super ( ) ;
48
+ this . destination = destination ;
49
+ this . add ( destination ) ;
41
50
}
42
51
43
52
_next ( x : T ) {
@@ -53,27 +62,28 @@ class GroupBySubscriber<T, R> extends Subscriber<T> {
53
62
groups = this . groups = typeof key === 'string' ? new FastMap ( ) : new Map ( ) ;
54
63
}
55
64
56
- let group : GroupSubject < R > = groups . get ( key ) ;
65
+ let group : Subject < R > = groups . get ( key ) ;
57
66
58
67
if ( ! group ) {
59
- groups . set ( key , group = new GroupSubject ( key ) ) ;
68
+ groups . set ( key , group = new Subject ( ) ) ;
69
+ let groupedObservable = new GroupedObservable < R > ( key , group , this . refCountSubscription ) ;
60
70
61
71
if ( durationSelector ) {
62
- let duration = tryCatch ( durationSelector ) ( group ) ;
72
+ let duration = tryCatch ( durationSelector ) ( groupedObservable ) ;
63
73
if ( duration === errorObject ) {
64
74
this . error ( duration . e ) ;
65
75
} else {
66
- this . add ( duration . _subscribe ( new GroupDurationSubscriber ( group , this ) ) ) ;
76
+ this . add ( duration . _subscribe ( new GroupDurationSubscriber ( key , group , this ) ) ) ;
67
77
}
68
78
}
69
79
70
- this . destination . next ( group ) ;
80
+ this . destination . next ( groupedObservable ) ;
71
81
}
72
82
73
83
if ( elementSelector ) {
74
84
let value = tryCatch ( elementSelector ) ( x ) ;
75
85
if ( value === errorObject ) {
76
- group . error ( value . e ) ;
86
+ this . error ( value . e ) ;
77
87
} else {
78
88
group . next ( value ) ;
79
89
}
@@ -111,26 +121,24 @@ class GroupBySubscriber<T, R> extends Subscriber<T> {
111
121
}
112
122
113
123
class GroupDurationSubscriber < T > extends Subscriber < T > {
114
- constructor ( private group : GroupSubject < T > ,
124
+ constructor ( private key : string ,
125
+ private group : Subject < T > ,
115
126
private parent : GroupBySubscriber < any , T > ) {
116
127
super ( null ) ;
117
128
}
118
129
119
130
_next ( value : T ) {
120
- const group = this . group ;
121
- group . complete ( ) ;
122
- this . parent . removeGroup ( group . key ) ;
131
+ this . group . complete ( ) ;
132
+ this . parent . removeGroup ( this . key ) ;
123
133
}
124
134
125
135
_error ( err : any ) {
126
- const group = this . group ;
127
- group . error ( err ) ;
128
- this . parent . removeGroup ( group . key ) ;
136
+ this . group . error ( err ) ;
137
+ this . parent . removeGroup ( this . key ) ;
129
138
}
130
139
131
140
_complete ( ) {
132
- const group = this . group ;
133
- group . complete ( ) ;
134
- this . parent . removeGroup ( group . key ) ;
141
+ this . group . complete ( ) ;
142
+ this . parent . removeGroup ( this . key ) ;
135
143
}
136
144
}
0 commit comments