Skip to content

Commit

Permalink
fix(skipUntil): properly manages notifier subscription
Browse files Browse the repository at this point in the history
- No longer waits until notifier is complete to complete resulting observable
- Unsubs from notifier after first notification
- Updates tests to be correct
- Corrects some grammar in test descriptions

fixes ReactiveX#1886
  • Loading branch information
benlesh committed Apr 2, 2018
1 parent df6ad6d commit a540751
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 60 deletions.
83 changes: 41 additions & 42 deletions spec/operators/skipUntil-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,32 +6,32 @@ declare function asDiagram(arg: string): Function;
const Observable = Rx.Observable;

/** @test {skipUntil} */
describe('Observable.prototype.skipUntil', () => {
describe('skipUntil', () => {
asDiagram('skipUntil')('should skip values until another observable notifies', () => {
const e1 = hot('--a--b--c--d--e----|');
const e1subs = '^ !';
const skip = hot('---------x------| ');
const skipSubs = '^ ! ';
const skipSubs = '^ ! ';
const expected = ('-----------d--e----|');

expectObservable(e1.skipUntil(skip)).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(skip.subscriptions).toBe(skipSubs);
});

it('should emit element only after another observable emits', () => {
it('should emit elements after notifer emits', () => {
const e1 = hot('--a--b--c--d--e--|');
const e1subs = '^ !';
const skip = hot('-----------x----| ');
const skipSubs = '^ ! ';
const expected = ('--------------e--|');
const skip = hot('---------x----| ');
const skipSubs = '^ ! ';
const expected = ('-----------d--e--|');

expectObservable(e1.skipUntil(skip)).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(skip.subscriptions).toBe(skipSubs);
});

it('should skip value and raises error until another observable raises error', () => {
it('should raise an error if notifier throws and source is hot', () => {
const e1 = hot('--a--b--c--d--e--|');
const e1subs = '^ ! ';
const skip = hot('-------------# ');
Expand All @@ -43,11 +43,11 @@ describe('Observable.prototype.skipUntil', () => {
expectSubscriptions(skip.subscriptions).toBe(skipSubs);
});

it('should skip all element when another observable does not emit and completes early', () => {
it('should skip all elements when notifier does not emit and completes early', () => {
const e1 = hot('--a--b--c--d--e--|');
const e1subs = '^ !';
const skip = hot('------------| ');
const skipSubs = '^ ! ';
const skip = hot('------------|');
const skipSubs = '^ !';
const expected = '-----------------|';

expectObservable(e1.skipUntil(skip)).toBe(expected);
Expand Down Expand Up @@ -86,7 +86,7 @@ describe('Observable.prototype.skipUntil', () => {
expectSubscriptions(skip.subscriptions).toBe(skipSubs);
});

it('should skip all element when another observable is empty', () => {
it('should skip all elements when notifier is empty', () => {
const e1 = hot('--a--b--c--d--e--|');
const e1subs = '^ !';
const skip = cold('|');
Expand All @@ -98,7 +98,7 @@ describe('Observable.prototype.skipUntil', () => {
expectSubscriptions(skip.subscriptions).toBe(skipSubs);
});

it('should keep subscription to source, to wait for its eventual complete', () => {
it('should keep subscription to source, to wait for its eventual completion', () => {
const e1 = hot('------------------------------|');
const e1subs = '^ !';
const skip = hot('-------| ');
Expand All @@ -110,43 +110,31 @@ describe('Observable.prototype.skipUntil', () => {
expectSubscriptions(skip.subscriptions).toBe(skipSubs);
});

it('should not complete if source observable does not complete', () => {
it('should not complete if hot source observable does not complete', () => {
const e1 = hot('-');
const e1subs = '^';
const skip = hot('-------------x--|');
const skipSubs = '^ !';
const skipSubs = '^ ! ';
const expected = '-';

expectObservable(e1.skipUntil(skip)).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(skip.subscriptions).toBe(skipSubs);
});

it('should not complete if source observable never completes', () => {
it('should not complete if cold source observable never completes', () => {
const e1 = cold( '-');
const e1subs = '^';
const skip = hot('-------------x--|');
const skipSubs = '^ !';
const skipSubs = '^ ! ';
const expected = '-';

expectObservable(e1.skipUntil(skip)).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(skip.subscriptions).toBe(skipSubs);
});

it('should raise error if source does not completes when another observable raises error', () => {
const e1 = hot('-');
const e1subs = '^ !';
const skip = hot('-------------#');
const skipSubs = '^ !';
const expected = '-------------#';

expectObservable(e1.skipUntil(skip)).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(skip.subscriptions).toBe(skipSubs);
});

it('should raise error if source never completes when another observable raises error', () => {
it('should raise error if cold source is never and notifier errors', () => {
const e1 = cold( '-');
const e1subs = '^ !';
const skip = hot('-------------#');
Expand All @@ -158,43 +146,43 @@ describe('Observable.prototype.skipUntil', () => {
expectSubscriptions(skip.subscriptions).toBe(skipSubs);
});

it('should skip all element and does not complete when another observable never completes', () => {
it('should skip all elements and complete if notifier is cold never', () => {
const e1 = hot( '--a--b--c--d--e--|');
const e1subs = '^ !';
const skip = cold('-');
const skipSubs = '^ !';
const expected = '-';
const expected = '-----------------|';

expectObservable(e1.skipUntil(skip)).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(skip.subscriptions).toBe(skipSubs);
});

it('should skip all element and does not complete when another observable does not completes', () => {
it('should skip all elements and complete if notifier is a hot never', () => {
const e1 = hot('--a--b--c--d--e--|');
const e1subs = '^ !';
const skip = hot('-');
const skipSubs = '^ !';
const expected = '-';
const expected = '-----------------|';

expectObservable(e1.skipUntil(skip)).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(skip.subscriptions).toBe(skipSubs);
});

it('should skip all element and does not complete when another observable completes after source', () => {
const e1 = hot('--a--b--c--d--e--|');
it('should skip all elements and complete, even if notifier would not complete until later', () => {
const e1 = hot('^-a--b--c--d--e--|');
const e1subs = '^ !';
const skip = hot('------------------------|');
const skip = hot('^-----------------------|');
const skipSubs = '^ !';
const expected = '------------------';
const expected = '-----------------|';

expectObservable(e1.skipUntil(skip)).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(skip.subscriptions).toBe(skipSubs);
});

it('should not completes if source does not completes when another observable does not emit', () => {
it('should not complete if source does not complete if notifier completes without emission', () => {
const e1 = hot('-');
const e1subs = '^';
const skip = hot('--------------|');
Expand All @@ -206,7 +194,7 @@ describe('Observable.prototype.skipUntil', () => {
expectSubscriptions(skip.subscriptions).toBe(skipSubs);
});

it('should not completes if source and another observable both does not complete', () => {
it('should not complete if source and notifier are both hot never', () => {
const e1 = hot('-');
const e1subs = '^';
const skip = hot('-');
Expand All @@ -218,12 +206,12 @@ describe('Observable.prototype.skipUntil', () => {
expectSubscriptions(skip.subscriptions).toBe(skipSubs);
});

it('should skip all element when another observable unsubscribed early before emit', () => {
it('should skip skip all elements if notifier is unsubscribed explicitly before the notifier emits', () => {
const e1 = hot( '--a--b--c--d--e--|');
const e1subs = ['^ !',
'^ !']; // for the explicit subscribe some lines below
'^ !']; // for the explicit subscribe some lines below
const skip = new Rx.Subject();
const expected = '-';
const expected = '-----------------|';

e1.subscribe((x: string) => {
if (x === 'd' && !skip.closed) {
Expand All @@ -236,4 +224,15 @@ describe('Observable.prototype.skipUntil', () => {
expectObservable(e1.skipUntil(skip)).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should unsubscribe the notifier after its first nexted value', () => {
const source = hot('-^-o---o---o---o---o---o---|');
const notifier = hot('-^--------n--n--n--n--n--n-|');
const nSubs = '^ !';
const expected = '-^---------o---o---o---o---|';
const result = source.skipUntil(notifier);

expectObservable(result).toBe(expected);
expectSubscriptions(notifier.subscriptions).toBe(nSubs);
});
});
30 changes: 12 additions & 18 deletions src/internal/operators/skipUntil.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { OuterSubscriber } from '../OuterSubscriber';
import { InnerSubscriber } from '../InnerSubscriber';
import { subscribeToResult } from '../util/subscribeToResult';
import { MonoTypeOperatorFunction, TeardownLogic } from '../types';
import { Subscription } from '../Subscription';

/**
* Returns an Observable that skips items emitted by the source Observable until a second Observable emits an item.
Expand All @@ -26,8 +27,12 @@ class SkipUntilOperator<T> implements Operator<T, T> {
constructor(private notifier: Observable<any>) {
}

call(subscriber: Subscriber<T>, source: any): TeardownLogic {
return source.subscribe(new SkipUntilSubscriber(subscriber, this.notifier));
call(destination: Subscriber<T>, source: any): TeardownLogic {
const { notifier } = this;
const innerSubscription = new Subscription();
const subscriber = new SkipUntilSubscriber(destination, innerSubscription);
innerSubscription.add(subscribeToResult(subscriber, notifier));
return source.subscribe(subscriber);
}
}

Expand All @@ -39,12 +44,11 @@ class SkipUntilOperator<T> implements Operator<T, T> {
class SkipUntilSubscriber<T, R> extends OuterSubscriber<T, R> {

private hasValue: boolean = false;
private isInnerStopped: boolean = false;

constructor(destination: Subscriber<any>,
notifier: Observable<any>) {
private innerSubscription: Subscription) {
super(destination);
this.add(subscribeToResult(this, notifier));
this.add(innerSubscription);
}

protected _next(value: T) {
Expand All @@ -53,24 +57,14 @@ class SkipUntilSubscriber<T, R> extends OuterSubscriber<T, R> {
}
}

protected _complete() {
if (this.isInnerStopped) {
super._complete();
} else {
this.unsubscribe();
}
}

notifyNext(outerValue: T, innerValue: R,
outerIndex: number, innerIndex: number,
innerSub: InnerSubscriber<T, R>): void {
this.hasValue = true;
this.innerSubscription.unsubscribe();
}

notifyComplete(): void {
this.isInnerStopped = true;
if (this.isStopped) {
super._complete();
}
notifyComplete() {
/* do nothing */
}
}

0 comments on commit a540751

Please sign in to comment.