Skip to content

Commit a5df963

Browse files
authored
2.x: coverage, cleanup fixes 10/14-1 (#4705)
1 parent d320d5c commit a5df963

32 files changed

+1777
-471
lines changed

src/main/java/io/reactivex/Observable.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4549,7 +4549,7 @@ public final T blockingLast(T defaultItem) {
45494549
*/
45504550
@SchedulerSupport(SchedulerSupport.NONE)
45514551
public final Iterable<T> blockingLatest() {
4552-
return BlockingObservableLatest.latest(this);
4552+
return new BlockingObservableLatest<T>(this);
45534553
}
45544554

45554555
/**
@@ -4571,7 +4571,7 @@ public final Iterable<T> blockingLatest() {
45714571
*/
45724572
@SchedulerSupport(SchedulerSupport.NONE)
45734573
public final Iterable<T> blockingMostRecent(T initialValue) {
4574-
return BlockingObservableMostRecent.mostRecent(this, initialValue);
4574+
return new BlockingObservableMostRecent<T>(this, initialValue);
45754575
}
45764576

45774577
/**
@@ -4590,7 +4590,7 @@ public final Iterable<T> blockingMostRecent(T initialValue) {
45904590
*/
45914591
@SchedulerSupport(SchedulerSupport.NONE)
45924592
public final Iterable<T> blockingNext() {
4593-
return BlockingObservableNext.next(this);
4593+
return new BlockingObservableNext<T>(this);
45944594
}
45954595

45964596
/**

src/main/java/io/reactivex/internal/operators/observable/BlockingObservableIterator.java

Lines changed: 13 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424

2525
public final class BlockingObservableIterator<T>
2626
extends AtomicReference<Disposable>
27-
implements io.reactivex.Observer<T>, Iterator<T>, Runnable, Disposable {
27+
implements io.reactivex.Observer<T>, Iterator<T>, Disposable {
2828

2929

3030
private static final long serialVersionUID = 6695226475494099826L;
@@ -38,8 +38,6 @@ public final class BlockingObservableIterator<T>
3838
volatile boolean done;
3939
Throwable error;
4040

41-
volatile boolean cancelled;
42-
4341
public BlockingObservableIterator(int batchSize) {
4442
this.queue = new SpscLinkedArrayQueue<T>(batchSize);
4543
this.lock = new ReentrantLock();
@@ -49,9 +47,6 @@ public BlockingObservableIterator(int batchSize) {
4947
@Override
5048
public boolean hasNext() {
5149
for (;;) {
52-
if (cancelled) {
53-
return false;
54-
}
5550
boolean d = done;
5651
boolean empty = queue.isEmpty();
5752
if (d) {
@@ -64,16 +59,19 @@ public boolean hasNext() {
6459
}
6560
}
6661
if (empty) {
67-
lock.lock();
6862
try {
69-
while (!cancelled && !done && queue.isEmpty()) {
70-
condition.await();
63+
lock.lock();
64+
try {
65+
while (!done && queue.isEmpty()) {
66+
condition.await();
67+
}
68+
} finally {
69+
lock.unlock();
7170
}
7271
} catch (InterruptedException ex) {
73-
run();
72+
DisposableHelper.dispose(this);
73+
signalConsumer();
7474
throw ExceptionHelper.wrapOrThrow(ex);
75-
} finally {
76-
lock.unlock();
7775
}
7876
} else {
7977
return true;
@@ -84,15 +82,7 @@ public boolean hasNext() {
8482
@Override
8583
public T next() {
8684
if (hasNext()) {
87-
T v = queue.poll();
88-
89-
if (v == null) {
90-
run();
91-
92-
throw new IllegalStateException("Queue empty?!");
93-
}
94-
95-
return v;
85+
return queue.poll();
9686
}
9787
throw new NoSuchElementException();
9888
}
@@ -104,13 +94,8 @@ public void onSubscribe(Disposable s) {
10494

10595
@Override
10696
public void onNext(T t) {
107-
if (!queue.offer(t)) {
108-
DisposableHelper.dispose(this);
109-
110-
onError(new IllegalStateException("Queue full?!"));
111-
} else {
112-
signalConsumer();
113-
}
97+
queue.offer(t);
98+
signalConsumer();
11499
}
115100

116101
@Override
@@ -135,12 +120,6 @@ void signalConsumer() {
135120
}
136121
}
137122

138-
@Override
139-
public void run() {
140-
DisposableHelper.dispose(this);
141-
signalConsumer();
142-
}
143-
144123
@Override // otherwise default method which isn't available in Java 7
145124
public void remove() {
146125
throw new UnsupportedOperationException("remove");

src/main/java/io/reactivex/internal/operators/observable/BlockingObservableLatest.java

Lines changed: 37 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -21,41 +21,32 @@
2121
import io.reactivex.Observable;
2222
import io.reactivex.internal.util.ExceptionHelper;
2323
import io.reactivex.observers.DisposableObserver;
24+
import io.reactivex.plugins.RxJavaPlugins;
2425

2526
/**
2627
* Wait for and iterate over the latest values of the source observable. If the source works faster than the
2728
* iterator, values may be skipped, but not the {@code onError} or {@code onComplete} events.
29+
* @param <T> the value type
2830
*/
29-
public enum BlockingObservableLatest {
30-
;
31-
32-
/**
33-
* Returns an {@code Iterable} that blocks until or unless the {@code Observable} emits an item that has not
34-
* been returned by the {@code Iterable}, then returns that item.
35-
*
36-
* @param <T> the value type
37-
* @param source
38-
* the source {@code Observable}
39-
* @return an {@code Iterable} that blocks until or unless the {@code Observable} emits an item that has not
40-
* been returned by the {@code Iterable}, then returns that item
41-
*/
42-
public static <T> Iterable<T> latest(final ObservableSource<? extends T> source) {
43-
return new Iterable<T>() {
44-
@Override
45-
public Iterator<T> iterator() {
46-
LatestObserverIterator<T> lio = new LatestObserverIterator<T>();
47-
48-
@SuppressWarnings("unchecked")
49-
Observable<Notification<T>> materialized = Observable.wrap((ObservableSource<T>)source).materialize();
50-
51-
materialized.subscribe(lio);
52-
return lio;
53-
}
54-
};
31+
public final class BlockingObservableLatest<T> implements Iterable<T> {
32+
33+
final ObservableSource<T> source;
34+
35+
public BlockingObservableLatest(ObservableSource<T> source) {
36+
this.source = source;
37+
}
38+
39+
@Override
40+
public Iterator<T> iterator() {
41+
BlockingObservableLatestIterator<T> lio = new BlockingObservableLatestIterator<T>();
42+
43+
Observable<Notification<T>> materialized = Observable.wrap(source).materialize();
44+
45+
materialized.subscribe(lio);
46+
return lio;
5547
}
5648

57-
/** Observer of source, iterator for output. */
58-
static final class LatestObserverIterator<T> extends DisposableObserver<Notification<T>> implements Iterator<T> {
49+
static final class BlockingObservableLatestIterator<T> extends DisposableObserver<Notification<T>> implements Iterator<T> {
5950
// iterator's notification
6051
Notification<T> iteratorNotification;
6152

@@ -73,7 +64,7 @@ public void onNext(Notification<T> args) {
7364

7465
@Override
7566
public void onError(Throwable e) {
76-
// not expected
67+
RxJavaPlugins.onError(e);
7768
}
7869

7970
@Override
@@ -86,22 +77,20 @@ public boolean hasNext() {
8677
if (iteratorNotification != null && iteratorNotification.isOnError()) {
8778
throw ExceptionHelper.wrapOrThrow(iteratorNotification.getError());
8879
}
89-
if (iteratorNotification == null || iteratorNotification.isOnNext()) {
90-
if (iteratorNotification == null) {
91-
try {
92-
notify.acquire();
93-
} catch (InterruptedException ex) {
94-
dispose();
95-
Thread.currentThread().interrupt();
96-
iteratorNotification = Notification.createOnError(ex);
97-
throw ExceptionHelper.wrapOrThrow(ex);
98-
}
99-
100-
Notification<T> n = value.getAndSet(null);
101-
iteratorNotification = n;
102-
if (n.isOnError()) {
103-
throw ExceptionHelper.wrapOrThrow(n.getError());
104-
}
80+
if (iteratorNotification == null) {
81+
try {
82+
notify.acquire();
83+
} catch (InterruptedException ex) {
84+
dispose();
85+
Thread.currentThread().interrupt();
86+
iteratorNotification = Notification.createOnError(ex);
87+
throw ExceptionHelper.wrapOrThrow(ex);
88+
}
89+
90+
Notification<T> n = value.getAndSet(null);
91+
iteratorNotification = n;
92+
if (n.isOnError()) {
93+
throw ExceptionHelper.wrapOrThrow(n.getError());
10594
}
10695
}
10796
return iteratorNotification.isOnNext();
@@ -110,11 +99,9 @@ public boolean hasNext() {
11099
@Override
111100
public T next() {
112101
if (hasNext()) {
113-
if (iteratorNotification.isOnNext()) {
114-
T v = iteratorNotification.getValue();
115-
iteratorNotification = null;
116-
return v;
117-
}
102+
T v = iteratorNotification.getValue();
103+
iteratorNotification = null;
104+
return v;
118105
}
119106
throw new NoSuchElementException();
120107
}
@@ -123,6 +110,5 @@ public T next() {
123110
public void remove() {
124111
throw new UnsupportedOperationException("Read-only iterator.");
125112
}
126-
127113
}
128114
}

src/main/java/io/reactivex/internal/operators/observable/BlockingObservableMostRecent.java

Lines changed: 22 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -25,36 +25,31 @@
2525
* seed value if no item has yet been emitted.
2626
* <p>
2727
* <img width="640" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.mostRecent.png" alt="">
28+
*
29+
* @param <T> the value type
2830
*/
29-
public enum BlockingObservableMostRecent {
30-
;
31-
/**
32-
* Returns an {@code Iterable} that always returns the item most recently emitted by the {@code Observable}.
33-
*
34-
* @param <T> the value type
35-
* @param source
36-
* the source {@code Observable}
37-
* @param initialValue
38-
* a default item to return from the {@code Iterable} if {@code source} has not yet emitted any
39-
* items
40-
* @return an {@code Iterable} that always returns the item most recently emitted by {@code source}, or
41-
* {@code initialValue} if {@code source} has not yet emitted any items
42-
*/
43-
public static <T> Iterable<T> mostRecent(final ObservableSource<? extends T> source, final T initialValue) {
44-
return new Iterable<T>() {
45-
@Override
46-
public Iterator<T> iterator() {
47-
MostRecentObserver<T> mostRecentObserver = new MostRecentObserver<T>(initialValue);
31+
public final class BlockingObservableMostRecent<T> implements Iterable<T> {
4832

49-
/**
50-
* Subscribe instead of unsafeSubscribe since this is the final subscribe in the chain
51-
* since it is for BlockingObservable.
52-
*/
53-
source.subscribe(mostRecentObserver);
33+
final ObservableSource<T> source;
34+
35+
final T initialValue;
36+
37+
public BlockingObservableMostRecent(ObservableSource<T> source, T initialValue) {
38+
this.source = source;
39+
this.initialValue = initialValue;
40+
}
41+
42+
@Override
43+
public Iterator<T> iterator() {
44+
MostRecentObserver<T> mostRecentObserver = new MostRecentObserver<T>(initialValue);
45+
46+
/**
47+
* Subscribe instead of unsafeSubscribe since this is the final subscribe in the chain
48+
* since it is for BlockingObservable.
49+
*/
50+
source.subscribe(mostRecentObserver);
5451

55-
return mostRecentObserver.getIterable();
56-
}
57-
};
52+
return mostRecentObserver.getIterable();
5853
}
5954

6055
static final class MostRecentObserver<T> extends DefaultObserver<T> {

0 commit comments

Comments
 (0)