Skip to content

Commit

Permalink
2.x: Add Completable.hide() (#4973)
Browse files Browse the repository at this point in the history
  • Loading branch information
vanniktech authored and akarnokd committed Jan 8, 2017
1 parent a3ccbf9 commit bcdfb13
Show file tree
Hide file tree
Showing 4 changed files with 204 additions and 0 deletions.
18 changes: 18 additions & 0 deletions src/main/java/io/reactivex/Completable.java
Original file line number Diff line number Diff line change
Expand Up @@ -1586,6 +1586,24 @@ public final <T> Flowable<T> startWith(Publisher<T> other) {
return this.<T>toFlowable().startWith(other);
}

/**
* Hides the identity of this Completable and its Disposable.
* <p>Allows preventing certain identity-based
* optimizations (fusion).
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code hide} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @return the new Completable instance
* @since 2.0.5 - experimental
*/
@Experimental
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Completable hide() {
return RxJavaPlugins.onAssembly(new CompletableHide(this));
}

/**
* Subscribes to this CompletableConsumable and returns a Disposable which can be used to cancel
* the subscription.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.internal.operators.completable;

import io.reactivex.Completable;
import io.reactivex.CompletableObserver;
import io.reactivex.CompletableSource;
import io.reactivex.disposables.Disposable;
import io.reactivex.internal.disposables.DisposableHelper;

/**
* Hides the identity of the upstream Completable and its Disposable sent through onSubscribe.
*/
public final class CompletableHide extends Completable {

final CompletableSource source;

public CompletableHide(CompletableSource source) {
this.source = source;
}

@Override
protected void subscribeActual(CompletableObserver observer) {
source.subscribe(new HideCompletableObserver(observer));
}

static final class HideCompletableObserver implements CompletableObserver, Disposable {

final CompletableObserver actual;

Disposable d;

HideCompletableObserver(CompletableObserver actual) {
this.actual = actual;
}

@Override
public void dispose() {
d.dispose();
d = DisposableHelper.DISPOSED;
}

@Override
public boolean isDisposed() {
return d.isDisposed();
}

@Override
public void onSubscribe(Disposable d) {
if (DisposableHelper.validate(this.d, d)) {
this.d = d;

actual.onSubscribe(this);
}
}

@Override
public void onError(Throwable e) {
actual.onError(e);
}

@Override
public void onComplete() {
actual.onComplete();
}
}
}
23 changes: 23 additions & 0 deletions src/test/java/io/reactivex/TestHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.reactivex.functions.*;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.internal.fuseable.*;
import io.reactivex.internal.operators.completable.CompletableToFlowable;
import io.reactivex.internal.operators.maybe.MaybeToFlowable;
import io.reactivex.internal.operators.single.SingleToFlowable;
import io.reactivex.internal.subscriptions.BooleanSubscription;
Expand Down Expand Up @@ -1962,6 +1963,28 @@ public static <T, U> void checkDisposedMaybe(Function<Maybe<T>, ? extends MaybeS
assertFalse("Dispose not propagated!", pp.hasSubscribers());
}

/**
* Check if the operator applied to a Completable source propagates dispose properly.
* @param composer the function to apply an operator to the provided Completable source
*/
public static void checkDisposedCompletable(Function<Completable, ? extends CompletableSource> composer) {
PublishProcessor<Integer> pp = PublishProcessor.create();

TestSubscriber<Integer> ts = new TestSubscriber<Integer>();

try {
new CompletableToFlowable<Integer>(composer.apply(pp.ignoreElements())).subscribe(ts);
} catch (Throwable ex) {
throw ExceptionHelper.wrapOrThrow(ex);
}

assertTrue("Not subscribed to source!", pp.hasSubscribers());

ts.cancel();

assertFalse("Dispose not propagated!", pp.hasSubscribers());
}

/**
* Check if the operator applied to a Maybe source propagates dispose properly.
* @param <T> the source value type
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.internal.operators.completable;

import io.reactivex.Completable;
import io.reactivex.CompletableSource;
import io.reactivex.TestHelper;
import io.reactivex.exceptions.TestException;
import io.reactivex.functions.Function;
import io.reactivex.processors.PublishProcessor;
import io.reactivex.subjects.CompletableSubject;
import org.junit.Test;

import static org.junit.Assert.assertFalse;

public class CompletableHideTest {

@Test
public void never() {
Completable.never()
.hide()
.test()
.assertNotComplete()
.assertNoErrors();
}

@Test
public void complete() {
Completable.complete()
.hide()
.test()
.assertResult();
}

@Test
public void error() {
Completable.error(new TestException())
.hide()
.test()
.assertFailure(TestException.class);
}

@Test
public void hidden() {
assertFalse(CompletableSubject.create().hide() instanceof CompletableSubject);
}

@Test
public void dispose() {
TestHelper.checkDisposedCompletable(new Function<Completable, CompletableSource>() {
@Override
public CompletableSource apply(Completable m) throws Exception {
return m.hide();
}
});
}

@Test
public void isDisposed() {
PublishProcessor<Integer> pp = PublishProcessor.create();

TestHelper.checkDisposed(pp.ignoreElements().hide());
}

@Test
public void doubleOnSubscribe() {
TestHelper.checkDoubleOnSubscribeCompletable(new Function<Completable, Completable>() {
@Override
public Completable apply(Completable f) throws Exception {
return f.hide();
}
});
}
}

0 comments on commit bcdfb13

Please sign in to comment.