|
1 | | -import { Subscriber } from '../Subscriber'; |
2 | | -import { Subscription } from '../Subscription'; |
| 1 | + |
3 | 2 | import { Observable } from '../Observable'; |
4 | | -import { Operator } from '../Operator'; |
5 | 3 | import { Subject } from '../Subject'; |
6 | | -import { Map } from '../util/Map'; |
7 | | -import { FastMap } from '../util/FastMap'; |
| 4 | +import { groupBy as higherOrder, GroupedObservable } from '../operators/groupBy'; |
8 | 5 |
|
9 | 6 | /* tslint:disable:max-line-length */ |
10 | 7 | export function groupBy<T, K>(this: Observable<T>, keySelector: (value: T) => K): Observable<GroupedObservable<K, T>>; |
@@ -84,210 +81,5 @@ export function groupBy<T, K, R>(this: Observable<T>, keySelector: (value: T) => |
84 | 81 | elementSelector?: ((value: T) => R) | void, |
85 | 82 | durationSelector?: (grouped: GroupedObservable<K, R>) => Observable<any>, |
86 | 83 | subjectSelector?: () => Subject<R>): Observable<GroupedObservable<K, R>> { |
87 | | - return this.lift(new GroupByOperator(keySelector, elementSelector, durationSelector, subjectSelector)); |
88 | | -} |
89 | | - |
90 | | -export interface RefCountSubscription { |
91 | | - count: number; |
92 | | - unsubscribe: () => void; |
93 | | - closed: boolean; |
94 | | - attemptedToUnsubscribe: boolean; |
95 | | -} |
96 | | - |
97 | | -class GroupByOperator<T, K, R> implements Operator<T, GroupedObservable<K, R>> { |
98 | | - constructor(private keySelector: (value: T) => K, |
99 | | - private elementSelector?: ((value: T) => R) | void, |
100 | | - private durationSelector?: (grouped: GroupedObservable<K, R>) => Observable<any>, |
101 | | - private subjectSelector?: () => Subject<R>) { |
102 | | - } |
103 | | - |
104 | | - call(subscriber: Subscriber<GroupedObservable<K, R>>, source: any): any { |
105 | | - return source.subscribe(new GroupBySubscriber( |
106 | | - subscriber, this.keySelector, this.elementSelector, this.durationSelector, this.subjectSelector |
107 | | - )); |
108 | | - } |
109 | | -} |
110 | | - |
111 | | -/** |
112 | | - * We need this JSDoc comment for affecting ESDoc. |
113 | | - * @ignore |
114 | | - * @extends {Ignored} |
115 | | - */ |
116 | | -class GroupBySubscriber<T, K, R> extends Subscriber<T> implements RefCountSubscription { |
117 | | - private groups: Map<K, Subject<T|R>> = null; |
118 | | - public attemptedToUnsubscribe: boolean = false; |
119 | | - public count: number = 0; |
120 | | - |
121 | | - constructor(destination: Subscriber<GroupedObservable<K, R>>, |
122 | | - private keySelector: (value: T) => K, |
123 | | - private elementSelector?: ((value: T) => R) | void, |
124 | | - private durationSelector?: (grouped: GroupedObservable<K, R>) => Observable<any>, |
125 | | - private subjectSelector?: () => Subject<R>) { |
126 | | - super(destination); |
127 | | - } |
128 | | - |
129 | | - protected _next(value: T): void { |
130 | | - let key: K; |
131 | | - try { |
132 | | - key = this.keySelector(value); |
133 | | - } catch (err) { |
134 | | - this.error(err); |
135 | | - return; |
136 | | - } |
137 | | - |
138 | | - this._group(value, key); |
139 | | - } |
140 | | - |
141 | | - private _group(value: T, key: K) { |
142 | | - let groups = this.groups; |
143 | | - |
144 | | - if (!groups) { |
145 | | - groups = this.groups = typeof key === 'string' ? new FastMap() : new Map(); |
146 | | - } |
147 | | - |
148 | | - let group = groups.get(key); |
149 | | - |
150 | | - let element: R; |
151 | | - if (this.elementSelector) { |
152 | | - try { |
153 | | - element = this.elementSelector(value); |
154 | | - } catch (err) { |
155 | | - this.error(err); |
156 | | - } |
157 | | - } else { |
158 | | - element = <any>value; |
159 | | - } |
160 | | - |
161 | | - if (!group) { |
162 | | - group = this.subjectSelector ? this.subjectSelector() : new Subject<R>(); |
163 | | - groups.set(key, group); |
164 | | - const groupedObservable = new GroupedObservable(key, group, this); |
165 | | - this.destination.next(groupedObservable); |
166 | | - if (this.durationSelector) { |
167 | | - let duration: any; |
168 | | - try { |
169 | | - duration = this.durationSelector(new GroupedObservable<K, R>(key, <Subject<R>>group)); |
170 | | - } catch (err) { |
171 | | - this.error(err); |
172 | | - return; |
173 | | - } |
174 | | - this.add(duration.subscribe(new GroupDurationSubscriber(key, group, this))); |
175 | | - } |
176 | | - } |
177 | | - |
178 | | - if (!group.closed) { |
179 | | - group.next(element); |
180 | | - } |
181 | | - } |
182 | | - |
183 | | - protected _error(err: any): void { |
184 | | - const groups = this.groups; |
185 | | - if (groups) { |
186 | | - groups.forEach((group, key) => { |
187 | | - group.error(err); |
188 | | - }); |
189 | | - |
190 | | - groups.clear(); |
191 | | - } |
192 | | - this.destination.error(err); |
193 | | - } |
194 | | - |
195 | | - protected _complete(): void { |
196 | | - const groups = this.groups; |
197 | | - if (groups) { |
198 | | - groups.forEach((group, key) => { |
199 | | - group.complete(); |
200 | | - }); |
201 | | - |
202 | | - groups.clear(); |
203 | | - } |
204 | | - this.destination.complete(); |
205 | | - } |
206 | | - |
207 | | - removeGroup(key: K): void { |
208 | | - this.groups.delete(key); |
209 | | - } |
210 | | - |
211 | | - unsubscribe() { |
212 | | - if (!this.closed) { |
213 | | - this.attemptedToUnsubscribe = true; |
214 | | - if (this.count === 0) { |
215 | | - super.unsubscribe(); |
216 | | - } |
217 | | - } |
218 | | - } |
219 | | -} |
220 | | - |
221 | | -/** |
222 | | - * We need this JSDoc comment for affecting ESDoc. |
223 | | - * @ignore |
224 | | - * @extends {Ignored} |
225 | | - */ |
226 | | -class GroupDurationSubscriber<K, T> extends Subscriber<T> { |
227 | | - constructor(private key: K, |
228 | | - private group: Subject<T>, |
229 | | - private parent: GroupBySubscriber<any, K, T>) { |
230 | | - super(group); |
231 | | - } |
232 | | - |
233 | | - protected _next(value: T): void { |
234 | | - this.complete(); |
235 | | - } |
236 | | - |
237 | | - protected _unsubscribe() { |
238 | | - const { parent, key } = this; |
239 | | - this.key = this.parent = null; |
240 | | - if (parent) { |
241 | | - parent.removeGroup(key); |
242 | | - } |
243 | | - } |
244 | | -} |
245 | | - |
246 | | -/** |
247 | | - * An Observable representing values belonging to the same group represented by |
248 | | - * a common key. The values emitted by a GroupedObservable come from the source |
249 | | - * Observable. The common key is available as the field `key` on a |
250 | | - * GroupedObservable instance. |
251 | | - * |
252 | | - * @class GroupedObservable<K, T> |
253 | | - */ |
254 | | -export class GroupedObservable<K, T> extends Observable<T> { |
255 | | - constructor(public key: K, |
256 | | - private groupSubject: Subject<T>, |
257 | | - private refCountSubscription?: RefCountSubscription) { |
258 | | - super(); |
259 | | - } |
260 | | - |
261 | | - protected _subscribe(subscriber: Subscriber<T>) { |
262 | | - const subscription = new Subscription(); |
263 | | - const {refCountSubscription, groupSubject} = this; |
264 | | - if (refCountSubscription && !refCountSubscription.closed) { |
265 | | - subscription.add(new InnerRefCountSubscription(refCountSubscription)); |
266 | | - } |
267 | | - subscription.add(groupSubject.subscribe(subscriber)); |
268 | | - return subscription; |
269 | | - } |
270 | | -} |
271 | | - |
272 | | -/** |
273 | | - * We need this JSDoc comment for affecting ESDoc. |
274 | | - * @ignore |
275 | | - * @extends {Ignored} |
276 | | - */ |
277 | | -class InnerRefCountSubscription extends Subscription { |
278 | | - constructor(private parent: RefCountSubscription) { |
279 | | - super(); |
280 | | - parent.count++; |
281 | | - } |
282 | | - |
283 | | - unsubscribe() { |
284 | | - const parent = this.parent; |
285 | | - if (!parent.closed && !this.closed) { |
286 | | - super.unsubscribe(); |
287 | | - parent.count -= 1; |
288 | | - if (parent.count === 0 && parent.attemptedToUnsubscribe) { |
289 | | - parent.unsubscribe(); |
290 | | - } |
291 | | - } |
292 | | - } |
| 84 | + return higherOrder(keySelector, elementSelector as any, durationSelector, subjectSelector)(this); |
293 | 85 | } |
0 commit comments