Skip to content

Commit 86992c6

Browse files
Andre Medeirosbenlesh
authored andcommitted
fix(groupBy): fix bugs with groupBy
Fix groupBy in order to pass tests. Also refactor some code related to groupBy, introducing groupBy-support.ts file. Most fixes are related to making inner Observables groups be hot which continue executing even if the outer Observable was unsubscribed. Another fix makes the outer Observable throw an error if the elementSelector function throws. The most significant refactor replaces GroupSubject with GroupedObservable, to resemble the RxJS legacy API, and disallow using Observer methods in the Subject. Relates to issue #375.
1 parent 893c7fe commit 86992c6

File tree

6 files changed

+107
-42
lines changed

6 files changed

+107
-42
lines changed

src/CoreOperators.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import Observable from './Observable';
22
import Scheduler from './Scheduler';
33
import ConnectableObservable from './observables/ConnectableObservable';
44
import Subject from './Subject'
5-
import GroupSubject from './subjects/GroupSubject';
5+
import {GroupedObservable} from './operators/groupBy-support';
66

77
export interface CoreOperators<T> {
88
buffer?: <T>(closingNotifier: Observable<any>) => Observable<T[]>;
@@ -30,7 +30,7 @@ export interface CoreOperators<T> {
3030
first?: <R>(predicate?: (value: T, index: number, source: Observable<T>) => boolean, resultSelector?: (value:T, index: number) => R, thisArg?: any, defaultValue?: any) => Observable<R>;
3131
flatMap?: <R>(project: ((x: T, ix: number) => Observable<any>), projectResult?: (x: T, y: any, ix: number, iy: number) => R, concurrent?: number) => Observable<R>;
3232
flatMapTo?: <R>(observable: Observable<any>, projectResult?: (x: T, y: any, ix: number, iy: number) => R, concurrent?: number) => Observable<R>;
33-
groupBy?: <T, R>(keySelector: (value:T) => string, durationSelector?: (group:GroupSubject<R>) => Observable<any>, elementSelector?: (value:T) => R) => Observable<R>;
33+
groupBy?: <T, R>(keySelector: (value:T) => string, elementSelector?: (value:T) => R, durationSelector?: (group: GroupedObservable<R>) => Observable<any>) => Observable<GroupedObservable<R>>;
3434
ignoreElements?: () => Observable<T>;
3535
last?: <R>(predicate?: (value: T, index:number) => boolean, resultSelector?: (value: T, index: number) => R, thisArg?: any, defaultValue?: any) => Observable<T>;
3636
every?: (predicate: (value: T, index:number) => boolean, thisArg?: any) => Observable<T>;

src/Rx.KitchenSink.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ observableProto.finally = _finally;
150150
import first from './operators/first';
151151
observableProto.first = first;
152152

153-
import groupBy from './operators/groupBy';
153+
import {groupBy} from './operators/groupBy';
154154
observableProto.groupBy = groupBy;
155155

156156
import ignoreElements from './operators/ignoreElements';

src/Rx.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ observableProto.finally = _finally;
127127
import first from './operators/first';
128128
observableProto.first = first;
129129

130-
import groupBy from './operators/groupBy';
130+
import {groupBy} from './operators/groupBy';
131131
observableProto.groupBy = groupBy;
132132

133133
import ignoreElements from './operators/ignoreElements';

src/operators/groupBy-support.ts

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
import Subscription from '../Subscription';
2+
import Subject from '../Subject';
3+
import Subscriber from '../Subscriber';
4+
import Observable from '../Observable';
5+
6+
export class RefCountSubscription<T> extends Subscription<T> {
7+
primary: Subscription<T>;
8+
attemptedToUnsubscribePrimary: boolean = false;
9+
count: number = 0;
10+
11+
constructor() {
12+
super();
13+
}
14+
15+
setPrimary(subscription: Subscription<T>) {
16+
this.primary = subscription;
17+
}
18+
19+
unsubscribe() {
20+
if (!this.isUnsubscribed && !this.attemptedToUnsubscribePrimary) {
21+
this.attemptedToUnsubscribePrimary = true;
22+
if (this.count === 0) {
23+
super.unsubscribe();
24+
this.primary.unsubscribe();
25+
}
26+
}
27+
}
28+
}
29+
30+
export class GroupedObservable<T> extends Observable<T> {
31+
constructor(public key: string,
32+
private groupSubject: Subject<T>,
33+
private refCountSubscription: RefCountSubscription<T>) {
34+
super();
35+
}
36+
37+
_subscribe(subscriber: Subscriber<T>) {
38+
const subscription = new Subscription();
39+
if (!this.refCountSubscription.isUnsubscribed) {
40+
subscription.add(new InnerRefCountSubscription(this.refCountSubscription));
41+
}
42+
subscription.add(this.groupSubject.subscribe(subscriber));
43+
return subscription;
44+
}
45+
}
46+
47+
export class InnerRefCountSubscription<T> extends Subscription<T> {
48+
constructor(private parent: RefCountSubscription<T>) {
49+
super();
50+
parent.count++;
51+
}
52+
53+
unsubscribe() {
54+
if (!this.parent.isUnsubscribed && !this.isUnsubscribed) {
55+
super.unsubscribe();
56+
this.parent.count--;
57+
if (this.parent.count === 0 && this.parent.attemptedToUnsubscribePrimary) {
58+
this.parent.unsubscribe();
59+
this.parent.primary.unsubscribe();
60+
}
61+
}
62+
}
63+
}
64+

src/operators/groupBy.ts

Lines changed: 39 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,43 +1,52 @@
11
import Operator from '../Operator';
22
import Observer from '../Observer';
3+
import Subscription from '../Subscription';
34
import Subscriber from '../Subscriber';
45
import Observable from '../Observable';
56
import Subject from '../Subject';
67
import Map from '../util/Map';
78
import FastMap from '../util/FastMap';
8-
import GroupSubject from '../subjects/GroupSubject';
9+
import {RefCountSubscription, GroupedObservable, InnerRefCountSubscription} from './groupBy-support';
910

1011
import tryCatch from '../util/tryCatch';
1112
import {errorObject} from '../util/errorObject';
1213
import bindCallback from '../util/bindCallback';
1314

14-
export default function groupBy<T, R>(keySelector: (value: T) => string,
15-
elementSelector?: (value: T) => R,
16-
durationSelector?: (grouped: GroupSubject<R>) => Observable<any>): Observable<GroupSubject<R>> {
17-
return this.lift(new GroupByOperator<T, R>(keySelector, durationSelector, elementSelector));
15+
export function groupBy<T, R>(keySelector: (value: T) => string,
16+
elementSelector?: (value: T) => R,
17+
durationSelector?: (grouped: GroupedObservable<R>) => Observable<any>): GroupByObservable<T, R> {
18+
return new GroupByObservable<T, R>(this, keySelector, elementSelector, durationSelector);
1819
}
1920

20-
class GroupByOperator<T, R> implements Operator<T, R> {
21-
constructor(private keySelector: (value: T) => string,
22-
private durationSelector?: (grouped: GroupSubject<R>) => Observable<any>,
23-
private elementSelector?: (value: T) => R) {
21+
export class GroupByObservable<T, R> extends Observable<GroupedObservable<R>> {
22+
constructor(public source: Observable<T>,
23+
private keySelector: (value: T) => string,
24+
private elementSelector?: (value: T) => R,
25+
private durationSelector?: (grouped: GroupedObservable<R>) => Observable<any>) {
26+
super();
2427
}
2528

26-
call(subscriber: Subscriber<R>): Subscriber<T> {
27-
return new GroupBySubscriber<T, R>(
28-
subscriber, this.keySelector, this.durationSelector, this.elementSelector
29+
_subscribe(subscriber) {
30+
const refCountSubscription = new RefCountSubscription();
31+
const groupBySubscriber = new GroupBySubscriber(
32+
subscriber, refCountSubscription, this.keySelector, this.elementSelector, this.durationSelector
2933
);
34+
refCountSubscription.setPrimary(this.source.subscribe(groupBySubscriber));
35+
return refCountSubscription;
3036
}
3137
}
3238

3339
class GroupBySubscriber<T, R> extends Subscriber<T> {
3440
private groups = null;
3541

3642
constructor(destination: Subscriber<R>,
43+
private refCountSubscription: RefCountSubscription<T>,
3744
private keySelector: (value: T) => string,
38-
private durationSelector?: (grouped: GroupSubject<R>) => Observable<any>,
39-
private elementSelector?: (value: T) => R) {
40-
super(destination);
45+
private elementSelector?: (value: T) => R,
46+
private durationSelector?: (grouped: GroupedObservable<R>) => Observable<any>) {
47+
super();
48+
this.destination = destination;
49+
this.add(destination);
4150
}
4251

4352
_next(x: T) {
@@ -53,27 +62,28 @@ class GroupBySubscriber<T, R> extends Subscriber<T> {
5362
groups = this.groups = typeof key === 'string' ? new FastMap() : new Map();
5463
}
5564

56-
let group: GroupSubject<R> = groups.get(key);
65+
let group: Subject<R> = groups.get(key);
5766

5867
if (!group) {
59-
groups.set(key, group = new GroupSubject(key));
68+
groups.set(key, group = new Subject());
69+
let groupedObservable = new GroupedObservable<R>(key, group, this.refCountSubscription);
6070

6171
if (durationSelector) {
62-
let duration = tryCatch(durationSelector)(group);
72+
let duration = tryCatch(durationSelector)(groupedObservable);
6373
if (duration === errorObject) {
6474
this.error(duration.e);
6575
} else {
66-
this.add(duration._subscribe(new GroupDurationSubscriber(group, this)));
76+
this.add(duration._subscribe(new GroupDurationSubscriber(key, group, this)));
6777
}
6878
}
6979

70-
this.destination.next(group);
80+
this.destination.next(groupedObservable);
7181
}
7282

7383
if (elementSelector) {
7484
let value = tryCatch(elementSelector)(x);
7585
if (value === errorObject) {
76-
group.error(value.e);
86+
this.error(value.e);
7787
} else {
7888
group.next(value);
7989
}
@@ -111,26 +121,24 @@ class GroupBySubscriber<T, R> extends Subscriber<T> {
111121
}
112122

113123
class GroupDurationSubscriber<T> extends Subscriber<T> {
114-
constructor(private group: GroupSubject<T>,
124+
constructor(private key: string,
125+
private group: Subject<T>,
115126
private parent: GroupBySubscriber<any, T>) {
116127
super(null);
117128
}
118129

119130
_next(value: T) {
120-
const group = this.group;
121-
group.complete();
122-
this.parent.removeGroup(group.key);
131+
this.group.complete();
132+
this.parent.removeGroup(this.key);
123133
}
124134

125135
_error(err: any) {
126-
const group = this.group;
127-
group.error(err);
128-
this.parent.removeGroup(group.key);
136+
this.group.error(err);
137+
this.parent.removeGroup(this.key);
129138
}
130139

131140
_complete() {
132-
const group = this.group;
133-
group.complete();
134-
this.parent.removeGroup(group.key);
141+
this.group.complete();
142+
this.parent.removeGroup(this.key);
135143
}
136144
}

src/subjects/GroupSubject.ts

Lines changed: 0 additions & 7 deletions
This file was deleted.

0 commit comments

Comments
 (0)