Skip to content

Commit

Permalink
Use t instead of value to allow for IDE naming (ReactiveX#4907)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jawnnypoo authored and Marius Constantin committed Mar 23, 2017
1 parent 0e19566 commit 91fd535
Showing 1 changed file with 161 additions and 99 deletions.
260 changes: 161 additions & 99 deletions src/test/java/io/reactivex/flowable/FlowableBackpressureTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
Expand All @@ -18,14 +18,22 @@
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.function.*;

import org.junit.*;
import org.junit.rules.TestName;
import org.reactivestreams.*;

import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
import io.reactivex.FlowableOnSubscribe;
import io.reactivex.exceptions.MissingBackpressureException;
import io.reactivex.functions.*;
import io.reactivex.functions.BiFunction;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import io.reactivex.functions.Predicate;
import io.reactivex.internal.subscriptions.SubscriptionHelper;
import io.reactivex.internal.util.BackpressureHelper;
import io.reactivex.schedulers.Schedulers;
Expand Down Expand Up @@ -97,17 +105,17 @@ public void testObserveOnWithSlowConsumer() {
AtomicInteger c = new AtomicInteger();
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
incrementingIntegers(c).observeOn(Schedulers.computation()).map(
new Function<Integer, Integer>() {
@Override
public Integer apply(Integer i) {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
new Function<Integer, Integer>() {
@Override
public Integer apply(Integer i) {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return i;
}
return i;
}
}
).take(NUM).subscribe(ts);
ts.awaitTerminalEvent();
ts.assertNoErrors();
Expand Down Expand Up @@ -177,9 +185,9 @@ public void testMergeAsyncThenObserveOnLoop() {
incrementingIntegers(c2).subscribeOn(Schedulers.computation()));

merged
.observeOn(Schedulers.io())
.take(NUM)
.subscribe(ts);
.observeOn(Schedulers.io())
.take(NUM)
.subscribe(ts);


ts.awaitTerminalEvent(5, TimeUnit.SECONDS);
Expand Down Expand Up @@ -220,13 +228,13 @@ public void testFlatMapSync() {
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();

incrementingIntegers(c)
.flatMap(new Function<Integer, Publisher<Integer>>() {
@Override
public Publisher<Integer> apply(Integer i) {
return incrementingIntegers(new AtomicInteger()).take(10);
}
})
.take(NUM).subscribe(ts);
.flatMap(new Function<Integer, Publisher<Integer>>() {
@Override
public Publisher<Integer> apply(Integer i) {
return incrementingIntegers(new AtomicInteger()).take(10);
}
})
.take(NUM).subscribe(ts);

ts.awaitTerminalEvent();
ts.assertNoErrors();
Expand All @@ -244,17 +252,17 @@ public void testFlatMapAsync() {
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();

incrementingIntegers(c)
.subscribeOn(Schedulers.computation())
.flatMap(new Function<Integer, Publisher<Integer>>() {
@Override
public Publisher<Integer> apply(Integer i) {
return incrementingIntegers(new AtomicInteger())
.take(10)
.subscribeOn(Schedulers.computation());
}
}
)
.take(NUM).subscribe(ts);
.subscribeOn(Schedulers.computation())
.flatMap(new Function<Integer, Publisher<Integer>>() {
@Override
public Publisher<Integer> apply(Integer i) {
return incrementingIntegers(new AtomicInteger())
.take(10)
.subscribeOn(Schedulers.computation());
}
}
)
.take(NUM).subscribe(ts);

ts.awaitTerminalEvent();
ts.assertNoErrors();
Expand Down Expand Up @@ -284,7 +292,7 @@ public Integer apply(Integer t1, Integer t2) {
});

zipped.take(NUM)
.subscribe(ts);
.subscribe(ts);

ts.awaitTerminalEvent();
ts.assertNoErrors();
Expand Down Expand Up @@ -435,43 +443,43 @@ public void testUserSubscriberUsingRequestAsync() throws InterruptedException {
incrementingIntegers(c).subscribeOn(Schedulers.newThread()).subscribe(
new ResourceSubscriber<Integer>() {

@Override
public void onStart() {
request(100);
}
@Override
public void onStart() {
request(100);
}

@Override
public void onComplete() {
latch.countDown();
}
@Override
public void onComplete() {
latch.countDown();
}

@Override
public void onError(Throwable e) {
latch.countDown();
}
@Override
public void onError(Throwable e) {
latch.countDown();
}

@Override
public void onNext(Integer t) {
int total = totalReceived.incrementAndGet();
received.incrementAndGet();
boolean done = false;
if (total >= 2000) {
done = true;
dispose();
}
if (received.get() == 100) {
batches.incrementAndGet();
received.set(0);
if (!done) {
request(100);
@Override
public void onNext(Integer t) {
int total = totalReceived.incrementAndGet();
received.incrementAndGet();
boolean done = false;
if (total >= 2000) {
done = true;
dispose();
}
if (received.get() == 100) {
batches.incrementAndGet();
received.set(0);
if (!done) {
request(100);
}
}
if (done) {
latch.countDown();
}
}
}
if (done) {
latch.countDown();
}
}

});
});

latch.await();
System.out.println("testUserSubscriberUsingRequestAsync => Received: " + totalReceived.get() + " Emitted: " + c.get() + " Request Batches: " + batches.get());
Expand All @@ -486,18 +494,18 @@ public void testFirehoseFailsAsExpected() {
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();

firehose(c).observeOn(Schedulers.computation())
.map(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer v) {
try {
Thread.sleep(10);
} catch (Exception e) {
e.printStackTrace();
}
return v;
}
})
.subscribe(ts);
.map(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer v) {
try {
Thread.sleep(10);
} catch (Exception e) {
e.printStackTrace();
}
return v;
}
})
.subscribe(ts);

ts.awaitTerminalEvent();
System.out.println("testFirehoseFailsAsExpected => Received: " + ts.valueCount() + " Emitted: " + c.get());
Expand Down Expand Up @@ -529,8 +537,8 @@ public void testOnBackpressureDrop() {
AtomicInteger c = new AtomicInteger();
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
firehose(c).onBackpressureDrop()
.observeOn(Schedulers.computation())
.map(SLOW_PASS_THRU).take(NUM).subscribe(ts);
.observeOn(Schedulers.computation())
.map(SLOW_PASS_THRU).take(NUM).subscribe(ts);
ts.awaitTerminalEvent();
ts.assertNoErrors();

Expand All @@ -555,21 +563,21 @@ public void testOnBackpressureDropWithAction() {
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();

firehose(emitCount)
.onBackpressureDrop(new Consumer<Integer>() {
@Override
public void accept(Integer v) {
dropCount.incrementAndGet();
}
})
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer v) {
passCount.incrementAndGet();
}
})
.observeOn(Schedulers.computation())
.map(SLOW_PASS_THRU)
.take(NUM).subscribe(ts);
.onBackpressureDrop(new Consumer<Integer>() {
@Override
public void accept(Integer v) {
dropCount.incrementAndGet();
}
})
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer v) {
passCount.incrementAndGet();
}
})
.observeOn(Schedulers.computation())
.map(SLOW_PASS_THRU)
.take(NUM).subscribe(ts);

ts.awaitTerminalEvent();
ts.assertNoErrors();
Expand All @@ -594,7 +602,7 @@ public void testOnBackpressureDropSynchronous() {
AtomicInteger c = new AtomicInteger();
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
firehose(c).onBackpressureDrop()
.map(SLOW_PASS_THRU).take(NUM).subscribe(ts);
.map(SLOW_PASS_THRU).take(NUM).subscribe(ts);
ts.awaitTerminalEvent();
ts.assertNoErrors();

Expand Down Expand Up @@ -622,7 +630,7 @@ public void accept(Integer j) {
dropCount.incrementAndGet();
}
})
.map(SLOW_PASS_THRU).take(NUM).subscribe(ts);
.map(SLOW_PASS_THRU).take(NUM).subscribe(ts);
ts.awaitTerminalEvent();
ts.assertNoErrors();

Expand Down Expand Up @@ -652,9 +660,9 @@ public boolean test(Integer t1) {
return t1 < 100000;
}
})
.onBackpressureBuffer()
.observeOn(Schedulers.computation())
.map(SLOW_PASS_THRU).take(NUM).subscribe(ts);
.onBackpressureBuffer()
.observeOn(Schedulers.computation())
.map(SLOW_PASS_THRU).take(NUM).subscribe(ts);

ts.awaitTerminalEvent();
ts.assertNoErrors();
Expand All @@ -664,6 +672,59 @@ public boolean test(Integer t1) {
assertEquals(NUM - 1, ts.values().get(NUM - 1).intValue());
}

@Test
public void testFlowable2() throws Exception {

final Flowable<Integer> flowable = Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> e) throws Exception {
AtomicInteger integer = new AtomicInteger(0);

while (integer.get() < 1000) {
e.onNext(integer.getAndIncrement());
}
Thread.sleep(10);
while (integer.get() < 1000) {
e.onNext(integer.getAndIncrement());
}

e.onComplete();
}
}, BackpressureStrategy.LATEST).subscribeOn(Schedulers.io());

flowable.blockingSubscribe(new Subscriber<Integer>() {
Subscription subscription;

@Override
public void onSubscribe(Subscription s) {
subscription = s;
s.request(1);
}

@Override
public void onNext(Integer integer) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
System.out.println("value:" + integer);
subscription.request(1);
}

@Override
public void onError(Throwable t) {

}

@Override
public void onComplete() {
System.out.println("onComplete");

}
});

}

/**
* A synchronous Observable that will emit incrementing integers as requested.
*
Expand Down Expand Up @@ -735,6 +796,7 @@ public void subscribe(Subscriber<? super Integer> s) {

static final Function<Integer, Integer> SLOW_PASS_THRU = new Function<Integer, Integer>() {
volatile int sink;

@Override
public Integer apply(Integer t1) {
// be slow ... but faster than Thread.sleep(1)
Expand Down

0 comments on commit 91fd535

Please sign in to comment.