Skip to content

Merge FlowableEmitter.BackpressureMode into BackpressureStrategy #4729

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Oct 19, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions src/main/java/io/reactivex/BackpressureStrategy.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,26 @@
*/
public enum BackpressureStrategy {
/**
* Buffer all values (unbounded) until there is a downstream demand for it.
* OnNext events are written without any buffering or dropping.
* Downstream has to deal with any overflow.
* <p>Useful when one applies one of the custom-parameter onBackpressureXXX operators.
*/
NONE,
/**
* Signals a MissingBackpressureException in case the downstream can't keep up.
*/
ERROR,
/**
* Buffers <em>all</em> onNext values until the downstream consumes it.
*/
BUFFER,
/**
* Drop the value if there is no current demand for it from the downstream.
* Drops the most recent onNext value if the downstream can't keep up.
*/
DROP,
/**
* Have a latest value always available and overwrite it with more recent ones
* if there is no demand for it from the downstream.
* Keeps only the latest onNext value, overwriting any previous value if the
* downstream can't keep up.
*/
LATEST
}
6 changes: 3 additions & 3 deletions src/main/java/io/reactivex/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -1525,7 +1525,7 @@ public static <T> Flowable<T> concatEager(Iterable<? extends Publisher<? extends
*
* emitter.setCancellable(c::close);
*
* }, BackpressureMode.BUFFER);
* }, BackpressureStrategy.BUFFER);
* </code></pre>
* <p>
* You should call the FlowableEmitter onNext, onError and onComplete methods in a serialized fashion. The
Expand All @@ -1542,12 +1542,12 @@ public static <T> Flowable<T> concatEager(Iterable<? extends Publisher<? extends
* @param mode the backpressure mode to apply if the downstream Subscriber doesn't request (fast) enough
* @return the new Flowable instance
* @see FlowableOnSubscribe
* @see FlowableEmitter.BackpressureMode
* @see BackpressureStrategy
* @see Cancellable
*/
@BackpressureSupport(BackpressureKind.SPECIAL)
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Flowable<T> create(FlowableOnSubscribe<T> source, FlowableEmitter.BackpressureMode mode) {
public static <T> Flowable<T> create(FlowableOnSubscribe<T> source, BackpressureStrategy mode) {
ObjectHelper.requireNonNull(source, "source is null");
ObjectHelper.requireNonNull(mode, "mode is null");
return RxJavaPlugins.onAssembly(new FlowableCreate<T>(source, mode));
Expand Down
29 changes: 0 additions & 29 deletions src/main/java/io/reactivex/FlowableEmitter.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,33 +63,4 @@ public interface FlowableEmitter<T> extends Emitter<T> {
* @return the serialized FlowableEmitter
*/
FlowableEmitter<T> serialize();

/**
* Options to handle backpressure in the emitter.
*/
enum BackpressureMode {
/**
* OnNext events are written without any buffering or dropping.
* Downstream has to deal with any overflow.
* <p>Useful when one applies one of the custom-parameter onBackpressureXXX operators.
*/
NONE,
/**
* Signals a MissingBackpressureException in case the downstream can't keep up.
*/
ERROR,
/**
* Buffers <em>all</em> onNext values until the downstream consumes it.
*/
BUFFER,
/**
* Drops the most recent onNext value if the downstream can't keep up.
*/
DROP,
/**
* Keeps only the latest onNext value, overwriting any previous value if the
* downstream can't keep up.
*/
LATEST
}
}
17 changes: 11 additions & 6 deletions src/main/java/io/reactivex/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.*;
import java.util.concurrent.*;

import io.reactivex.internal.operators.flowable.FlowableOnBackpressureError;
import org.reactivestreams.Publisher;

import io.reactivex.annotations.*;
Expand Down Expand Up @@ -11707,12 +11708,16 @@ public final Flowable<T> toFlowable(BackpressureStrategy strategy) {
Flowable<T> o = new FlowableFromObservable<T>(this);

switch (strategy) {
case DROP:
return o.onBackpressureDrop();
case LATEST:
return o.onBackpressureLatest();
default:
return o.onBackpressureBuffer();
case DROP:
return o.onBackpressureDrop();
case LATEST:
return o.onBackpressureLatest();
case NONE:
return o;
case ERROR:
return RxJavaPlugins.onAssembly(new FlowableOnBackpressureError<T>(o));
default:
return o.onBackpressureBuffer();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ public final class FlowableCreate<T> extends Flowable<T> {

final FlowableOnSubscribe<T> source;

final FlowableEmitter.BackpressureMode backpressure;
final BackpressureStrategy backpressure;

public FlowableCreate(FlowableOnSubscribe<T> source, FlowableEmitter.BackpressureMode backpressure) {
public FlowableCreate(FlowableOnSubscribe<T> source, BackpressureStrategy backpressure) {
this.source = source;
this.backpressure = backpressure;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/**
* Copyright 2016 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.internal.operators.flowable;

import io.reactivex.exceptions.MissingBackpressureException;
import io.reactivex.internal.subscriptions.SubscriptionHelper;
import io.reactivex.internal.util.BackpressureHelper;
import io.reactivex.plugins.RxJavaPlugins;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import java.util.concurrent.atomic.AtomicLong;

public final class FlowableOnBackpressureError<T> extends AbstractFlowableWithUpstream<T, T> {


public FlowableOnBackpressureError(Publisher<T> source) {
super(source);
}


@Override
protected void subscribeActual(Subscriber<? super T> s) {
this.source.subscribe(new BackpressureErrorSubscriber<T>(s));
}

static final class BackpressureErrorSubscriber<T>
extends AtomicLong implements Subscriber<T>, Subscription {

final Subscriber<? super T> actual;
Subscription s;
boolean done;

BackpressureErrorSubscriber(Subscriber<? super T> actual) {
this.actual = actual;
}

@Override
public void onSubscribe(Subscription s) {
if (SubscriptionHelper.validate(this.s, s)) {
this.s = s;
actual.onSubscribe(this);
s.request(Long.MAX_VALUE);
}
}

@Override
public void onNext(T t) {
if (done) {
return;
}
long r = get();
if (r != 0L) {
actual.onNext(t);
if (r != Long.MAX_VALUE) {
decrementAndGet();
}
} else {
onError(new MissingBackpressureException("could not emit value due to lack of requests"));
}
}

@Override
public void onError(Throwable t) {
if (done) {
RxJavaPlugins.onError(t);
return;
}
done = true;
actual.onError(t);
}

@Override
public void onComplete() {
if (done) {
return;
}
done = true;
actual.onComplete();
}

@Override
public void request(long n) {
if (SubscriptionHelper.validate(n)) {
BackpressureHelper.add(this, n);
}
}

@Override
public void cancel() {
s.cancel();
}
}
}
2 changes: 1 addition & 1 deletion src/test/java/io/reactivex/BackpressureEnumTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public void backpressureOverflowStrategy() {

@Test
public void backpressureStrategy() {
assertEquals(3, BackpressureStrategy.values().length);
assertEquals(5, BackpressureStrategy.values().length);

assertNotNull(BackpressureStrategy.valueOf("BUFFER"));
}
Expand Down
Loading