Skip to content

Commit

Permalink
2.x: add doAfterNext & doAfterSuccess to the other types (#4835)
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd authored Nov 11, 2016
1 parent 715d71d commit bbae4a5
Show file tree
Hide file tree
Showing 9 changed files with 887 additions and 0 deletions.
19 changes: 19 additions & 0 deletions src/main/java/io/reactivex/Maybe.java
Original file line number Diff line number Diff line change
Expand Up @@ -2273,6 +2273,25 @@ public final Maybe<T> delaySubscription(long delay, TimeUnit unit, Scheduler sch
return delaySubscription(Flowable.timer(delay, unit, scheduler));
}

/**
* Calls the specified consumer with the success item after this item has been emitted to the downstream.
* <p>Note that the {@code onAfterNext} action is shared between subscriptions and as such
* should be thread-safe.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code doAfterSuccess} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param onAfterSuccess the Consumer that will be called after emitting an item from upstream to the downstream
* @return the new Maybe instance
* @since 2.0.1 - experimental
*/
@SchedulerSupport(SchedulerSupport.NONE)
@Experimental
public final Maybe<T> doAfterSuccess(Consumer<? super T> onAfterSuccess) {
ObjectHelper.requireNonNull(onAfterSuccess, "doAfterSuccess is null");
return RxJavaPlugins.onAssembly(new MaybeDoAfterSuccess<T>(this, onAfterSuccess));
}

/**
* Registers an {@link Action} to be called when this Maybe invokes either
* {@link MaybeObserver#onComplete onSuccess},
Expand Down
21 changes: 21 additions & 0 deletions src/main/java/io/reactivex/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -6416,6 +6416,27 @@ public final Observable<T> distinctUntilChanged(BiPredicate<? super T, ? super T
return RxJavaPlugins.onAssembly(new ObservableDistinctUntilChanged<T, T>(this, Functions.<T>identity(), comparer));
}

/**
* Calls the specified consumer with the current item after this item has been emitted to the downstream.
* <p>Note that the {@code onAfterNext} action is shared between subscriptions and as such
* should be thread-safe.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code doAfterNext} does not operate by default on a particular {@link Scheduler}.</dd>
* <td><b>Operator-fusion:</b></dt>
* <dd>This operator supports boundary-limited synchronous or asynchronous queue-fusion.</dd>
* </dl>
* @param onAfterNext the Consumer that will be called after emitting an item from upstream to the downstream
* @return the new Observable instance
* @since 2.0.1 - experimental
*/
@SchedulerSupport(SchedulerSupport.NONE)
@Experimental
public final Observable<T> doAfterNext(Consumer<? super T> onAfterNext) {
ObjectHelper.requireNonNull(onAfterNext, "onAfterNext is null");
return RxJavaPlugins.onAssembly(new ObservableDoAfterNext<T>(this, onAfterNext));
}

/**
* Registers an {@link Action} to be called when this ObservableSource invokes either
* {@link Observer#onComplete onComplete} or {@link Observer#onError onError}.
Expand Down
19 changes: 19 additions & 0 deletions src/main/java/io/reactivex/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -1716,6 +1716,25 @@ public final <U> Single<T> delaySubscription(long time, TimeUnit unit, Scheduler
return delaySubscription(Observable.timer(time, unit, scheduler));
}

/**
* Calls the specified consumer with the success item after this item has been emitted to the downstream.
* <p>Note that the {@code doAfterSuccess} action is shared between subscriptions and as such
* should be thread-safe.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code doAfterSuccess} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param onAfterSuccess the Consumer that will be called after emitting an item from upstream to the downstream
* @return the new Single instance
* @since 2.0.1 - experimental
*/
@SchedulerSupport(SchedulerSupport.NONE)
@Experimental
public final Single<T> doAfterSuccess(Consumer<? super T> onAfterSuccess) {
ObjectHelper.requireNonNull(onAfterSuccess, "doAfterSuccess is null");
return RxJavaPlugins.onAssembly(new SingleDoAfterSuccess<T>(this, onAfterSuccess));
}

/**
* Calls the specified action after this Single signals onSuccess or onError or gets disposed by
* the downstream.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.internal.operators.maybe;

import io.reactivex.*;
import io.reactivex.annotations.Experimental;
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.Exceptions;
import io.reactivex.functions.Consumer;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.plugins.RxJavaPlugins;

/**
* Calls a consumer after pushing the current item to the downstream.
* @param <T> the value type
* @since 2.0.1 - experimental
*/
@Experimental
public final class MaybeDoAfterSuccess<T> extends AbstractMaybeWithUpstream<T, T> {

final Consumer<? super T> onAfterSuccess;

public MaybeDoAfterSuccess(MaybeSource<T> source, Consumer<? super T> onAfterSuccess) {
super(source);
this.onAfterSuccess = onAfterSuccess;
}

@Override
protected void subscribeActual(MaybeObserver<? super T> s) {
source.subscribe(new DoAfterObserver<T>(s, onAfterSuccess));
}

static final class DoAfterObserver<T> implements MaybeObserver<T>, Disposable {

final MaybeObserver<? super T> actual;

final Consumer<? super T> onAfterSuccess;

Disposable d;

DoAfterObserver(MaybeObserver<? super T> actual, Consumer<? super T> onAfterSuccess) {
this.actual = actual;
this.onAfterSuccess = onAfterSuccess;
}

@Override
public void onSubscribe(Disposable d) {
if (DisposableHelper.validate(this.d, d)) {
this.d = d;

actual.onSubscribe(this);
}
}

@Override
public void onSuccess(T t) {
actual.onSuccess(t);

try {
onAfterSuccess.accept(t);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
// remember, onSuccess is a terminal event and we can't call onError
RxJavaPlugins.onError(ex);
}
}

@Override
public void onError(Throwable e) {
actual.onError(e);
}

@Override
public void onComplete() {
actual.onComplete();
}

@Override
public void dispose() {
d.dispose();
}

@Override
public boolean isDisposed() {
return d.isDisposed();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.internal.operators.observable;

import io.reactivex.*;
import io.reactivex.annotations.Experimental;
import io.reactivex.functions.Consumer;
import io.reactivex.internal.observers.BasicFuseableObserver;

/**
* Calls a consumer after pushing the current item to the downstream.
* @param <T> the value type
* @since 2.0.1 - experimental
*/
@Experimental
public final class ObservableDoAfterNext<T> extends AbstractObservableWithUpstream<T, T> {

final Consumer<? super T> onAfterNext;

public ObservableDoAfterNext(ObservableSource<T> source, Consumer<? super T> onAfterNext) {
super(source);
this.onAfterNext = onAfterNext;
}

@Override
protected void subscribeActual(Observer<? super T> s) {
source.subscribe(new DoAfterObserver<T>(s, onAfterNext));
}

static final class DoAfterObserver<T> extends BasicFuseableObserver<T, T> {

final Consumer<? super T> onAfterNext;

DoAfterObserver(Observer<? super T> actual, Consumer<? super T> onAfterNext) {
super(actual);
this.onAfterNext = onAfterNext;
}

@Override
public void onNext(T t) {
actual.onNext(t);

if (sourceMode == NONE) {
try {
onAfterNext.accept(t);
} catch (Throwable ex) {
fail(ex);
}
}
}

@Override
public int requestFusion(int mode) {
return transitiveBoundaryFusion(mode);
}

@Override
public T poll() throws Exception {
T v = qs.poll();
if (v != null) {
onAfterNext.accept(v);
}
return v;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.internal.operators.single;

import io.reactivex.*;
import io.reactivex.annotations.Experimental;
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.Exceptions;
import io.reactivex.functions.Consumer;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.plugins.RxJavaPlugins;

/**
* Calls a consumer after pushing the current item to the downstream.
* @param <T> the value type
* @since 2.0.1 - experimental
*/
@Experimental
public final class SingleDoAfterSuccess<T> extends Single<T> {

final SingleSource<T> source;

final Consumer<? super T> onAfterSuccess;

public SingleDoAfterSuccess(SingleSource<T> source, Consumer<? super T> onAfterSuccess) {
this.source = source;
this.onAfterSuccess = onAfterSuccess;
}

@Override
protected void subscribeActual(SingleObserver<? super T> s) {
source.subscribe(new DoAfterObserver<T>(s, onAfterSuccess));
}

static final class DoAfterObserver<T> implements SingleObserver<T>, Disposable {

final SingleObserver<? super T> actual;

final Consumer<? super T> onAfterSuccess;

Disposable d;

DoAfterObserver(SingleObserver<? super T> actual, Consumer<? super T> onAfterSuccess) {
this.actual = actual;
this.onAfterSuccess = onAfterSuccess;
}

@Override
public void onSubscribe(Disposable d) {
if (DisposableHelper.validate(this.d, d)) {
this.d = d;

actual.onSubscribe(this);
}
}

@Override
public void onSuccess(T t) {
actual.onSuccess(t);

try {
onAfterSuccess.accept(t);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
// remember, onSuccess is a terminal event and we can't call onError
RxJavaPlugins.onError(ex);
}
}

@Override
public void onError(Throwable e) {
actual.onError(e);
}

@Override
public void dispose() {
d.dispose();
}

@Override
public boolean isDisposed() {
return d.isDisposed();
}
}
}
Loading

0 comments on commit bbae4a5

Please sign in to comment.