|
1 |
| -import { Subscriber } from '../Subscriber'; |
2 |
| -import { Subscription } from '../Subscription'; |
| 1 | + |
3 | 2 | import { Observable } from '../Observable';
|
4 |
| -import { Operator } from '../Operator'; |
5 | 3 | import { Subject } from '../Subject';
|
6 |
| -import { Map } from '../util/Map'; |
7 |
| -import { FastMap } from '../util/FastMap'; |
| 4 | +import { groupBy as higherOrder, GroupedObservable } from '../operators/groupBy'; |
8 | 5 |
|
9 | 6 | /* tslint:disable:max-line-length */
|
10 | 7 | export function groupBy<T, K>(this: Observable<T>, keySelector: (value: T) => K): Observable<GroupedObservable<K, T>>;
|
@@ -84,210 +81,5 @@ export function groupBy<T, K, R>(this: Observable<T>, keySelector: (value: T) =>
|
84 | 81 | elementSelector?: ((value: T) => R) | void,
|
85 | 82 | durationSelector?: (grouped: GroupedObservable<K, R>) => Observable<any>,
|
86 | 83 | subjectSelector?: () => Subject<R>): Observable<GroupedObservable<K, R>> {
|
87 |
| - return this.lift(new GroupByOperator(keySelector, elementSelector, durationSelector, subjectSelector)); |
88 |
| -} |
89 |
| - |
90 |
| -export interface RefCountSubscription { |
91 |
| - count: number; |
92 |
| - unsubscribe: () => void; |
93 |
| - closed: boolean; |
94 |
| - attemptedToUnsubscribe: boolean; |
95 |
| -} |
96 |
| - |
97 |
| -class GroupByOperator<T, K, R> implements Operator<T, GroupedObservable<K, R>> { |
98 |
| - constructor(private keySelector: (value: T) => K, |
99 |
| - private elementSelector?: ((value: T) => R) | void, |
100 |
| - private durationSelector?: (grouped: GroupedObservable<K, R>) => Observable<any>, |
101 |
| - private subjectSelector?: () => Subject<R>) { |
102 |
| - } |
103 |
| - |
104 |
| - call(subscriber: Subscriber<GroupedObservable<K, R>>, source: any): any { |
105 |
| - return source.subscribe(new GroupBySubscriber( |
106 |
| - subscriber, this.keySelector, this.elementSelector, this.durationSelector, this.subjectSelector |
107 |
| - )); |
108 |
| - } |
109 |
| -} |
110 |
| - |
111 |
| -/** |
112 |
| - * We need this JSDoc comment for affecting ESDoc. |
113 |
| - * @ignore |
114 |
| - * @extends {Ignored} |
115 |
| - */ |
116 |
| -class GroupBySubscriber<T, K, R> extends Subscriber<T> implements RefCountSubscription { |
117 |
| - private groups: Map<K, Subject<T|R>> = null; |
118 |
| - public attemptedToUnsubscribe: boolean = false; |
119 |
| - public count: number = 0; |
120 |
| - |
121 |
| - constructor(destination: Subscriber<GroupedObservable<K, R>>, |
122 |
| - private keySelector: (value: T) => K, |
123 |
| - private elementSelector?: ((value: T) => R) | void, |
124 |
| - private durationSelector?: (grouped: GroupedObservable<K, R>) => Observable<any>, |
125 |
| - private subjectSelector?: () => Subject<R>) { |
126 |
| - super(destination); |
127 |
| - } |
128 |
| - |
129 |
| - protected _next(value: T): void { |
130 |
| - let key: K; |
131 |
| - try { |
132 |
| - key = this.keySelector(value); |
133 |
| - } catch (err) { |
134 |
| - this.error(err); |
135 |
| - return; |
136 |
| - } |
137 |
| - |
138 |
| - this._group(value, key); |
139 |
| - } |
140 |
| - |
141 |
| - private _group(value: T, key: K) { |
142 |
| - let groups = this.groups; |
143 |
| - |
144 |
| - if (!groups) { |
145 |
| - groups = this.groups = typeof key === 'string' ? new FastMap() : new Map(); |
146 |
| - } |
147 |
| - |
148 |
| - let group = groups.get(key); |
149 |
| - |
150 |
| - let element: R; |
151 |
| - if (this.elementSelector) { |
152 |
| - try { |
153 |
| - element = this.elementSelector(value); |
154 |
| - } catch (err) { |
155 |
| - this.error(err); |
156 |
| - } |
157 |
| - } else { |
158 |
| - element = <any>value; |
159 |
| - } |
160 |
| - |
161 |
| - if (!group) { |
162 |
| - group = this.subjectSelector ? this.subjectSelector() : new Subject<R>(); |
163 |
| - groups.set(key, group); |
164 |
| - const groupedObservable = new GroupedObservable(key, group, this); |
165 |
| - this.destination.next(groupedObservable); |
166 |
| - if (this.durationSelector) { |
167 |
| - let duration: any; |
168 |
| - try { |
169 |
| - duration = this.durationSelector(new GroupedObservable<K, R>(key, <Subject<R>>group)); |
170 |
| - } catch (err) { |
171 |
| - this.error(err); |
172 |
| - return; |
173 |
| - } |
174 |
| - this.add(duration.subscribe(new GroupDurationSubscriber(key, group, this))); |
175 |
| - } |
176 |
| - } |
177 |
| - |
178 |
| - if (!group.closed) { |
179 |
| - group.next(element); |
180 |
| - } |
181 |
| - } |
182 |
| - |
183 |
| - protected _error(err: any): void { |
184 |
| - const groups = this.groups; |
185 |
| - if (groups) { |
186 |
| - groups.forEach((group, key) => { |
187 |
| - group.error(err); |
188 |
| - }); |
189 |
| - |
190 |
| - groups.clear(); |
191 |
| - } |
192 |
| - this.destination.error(err); |
193 |
| - } |
194 |
| - |
195 |
| - protected _complete(): void { |
196 |
| - const groups = this.groups; |
197 |
| - if (groups) { |
198 |
| - groups.forEach((group, key) => { |
199 |
| - group.complete(); |
200 |
| - }); |
201 |
| - |
202 |
| - groups.clear(); |
203 |
| - } |
204 |
| - this.destination.complete(); |
205 |
| - } |
206 |
| - |
207 |
| - removeGroup(key: K): void { |
208 |
| - this.groups.delete(key); |
209 |
| - } |
210 |
| - |
211 |
| - unsubscribe() { |
212 |
| - if (!this.closed) { |
213 |
| - this.attemptedToUnsubscribe = true; |
214 |
| - if (this.count === 0) { |
215 |
| - super.unsubscribe(); |
216 |
| - } |
217 |
| - } |
218 |
| - } |
219 |
| -} |
220 |
| - |
221 |
| -/** |
222 |
| - * We need this JSDoc comment for affecting ESDoc. |
223 |
| - * @ignore |
224 |
| - * @extends {Ignored} |
225 |
| - */ |
226 |
| -class GroupDurationSubscriber<K, T> extends Subscriber<T> { |
227 |
| - constructor(private key: K, |
228 |
| - private group: Subject<T>, |
229 |
| - private parent: GroupBySubscriber<any, K, T>) { |
230 |
| - super(group); |
231 |
| - } |
232 |
| - |
233 |
| - protected _next(value: T): void { |
234 |
| - this.complete(); |
235 |
| - } |
236 |
| - |
237 |
| - protected _unsubscribe() { |
238 |
| - const { parent, key } = this; |
239 |
| - this.key = this.parent = null; |
240 |
| - if (parent) { |
241 |
| - parent.removeGroup(key); |
242 |
| - } |
243 |
| - } |
244 |
| -} |
245 |
| - |
246 |
| -/** |
247 |
| - * An Observable representing values belonging to the same group represented by |
248 |
| - * a common key. The values emitted by a GroupedObservable come from the source |
249 |
| - * Observable. The common key is available as the field `key` on a |
250 |
| - * GroupedObservable instance. |
251 |
| - * |
252 |
| - * @class GroupedObservable<K, T> |
253 |
| - */ |
254 |
| -export class GroupedObservable<K, T> extends Observable<T> { |
255 |
| - constructor(public key: K, |
256 |
| - private groupSubject: Subject<T>, |
257 |
| - private refCountSubscription?: RefCountSubscription) { |
258 |
| - super(); |
259 |
| - } |
260 |
| - |
261 |
| - protected _subscribe(subscriber: Subscriber<T>) { |
262 |
| - const subscription = new Subscription(); |
263 |
| - const {refCountSubscription, groupSubject} = this; |
264 |
| - if (refCountSubscription && !refCountSubscription.closed) { |
265 |
| - subscription.add(new InnerRefCountSubscription(refCountSubscription)); |
266 |
| - } |
267 |
| - subscription.add(groupSubject.subscribe(subscriber)); |
268 |
| - return subscription; |
269 |
| - } |
270 |
| -} |
271 |
| - |
272 |
| -/** |
273 |
| - * We need this JSDoc comment for affecting ESDoc. |
274 |
| - * @ignore |
275 |
| - * @extends {Ignored} |
276 |
| - */ |
277 |
| -class InnerRefCountSubscription extends Subscription { |
278 |
| - constructor(private parent: RefCountSubscription) { |
279 |
| - super(); |
280 |
| - parent.count++; |
281 |
| - } |
282 |
| - |
283 |
| - unsubscribe() { |
284 |
| - const parent = this.parent; |
285 |
| - if (!parent.closed && !this.closed) { |
286 |
| - super.unsubscribe(); |
287 |
| - parent.count -= 1; |
288 |
| - if (parent.count === 0 && parent.attemptedToUnsubscribe) { |
289 |
| - parent.unsubscribe(); |
290 |
| - } |
291 |
| - } |
292 |
| - } |
| 84 | + return higherOrder(keySelector, elementSelector as any, durationSelector, subjectSelector)(this); |
293 | 85 | }
|
0 commit comments