Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: coverage, cleanup fixes 10/14-1 #4705

Merged
merged 1 commit into from
Oct 14, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/main/java/io/reactivex/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -4549,7 +4549,7 @@ public final T blockingLast(T defaultItem) {
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final Iterable<T> blockingLatest() {
return BlockingObservableLatest.latest(this);
return new BlockingObservableLatest<T>(this);
}

/**
Expand All @@ -4571,7 +4571,7 @@ public final Iterable<T> blockingLatest() {
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final Iterable<T> blockingMostRecent(T initialValue) {
return BlockingObservableMostRecent.mostRecent(this, initialValue);
return new BlockingObservableMostRecent<T>(this, initialValue);
}

/**
Expand All @@ -4590,7 +4590,7 @@ public final Iterable<T> blockingMostRecent(T initialValue) {
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final Iterable<T> blockingNext() {
return BlockingObservableNext.next(this);
return new BlockingObservableNext<T>(this);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

public final class BlockingObservableIterator<T>
extends AtomicReference<Disposable>
implements io.reactivex.Observer<T>, Iterator<T>, Runnable, Disposable {
implements io.reactivex.Observer<T>, Iterator<T>, Disposable {


private static final long serialVersionUID = 6695226475494099826L;
Expand All @@ -38,8 +38,6 @@ public final class BlockingObservableIterator<T>
volatile boolean done;
Throwable error;

volatile boolean cancelled;

public BlockingObservableIterator(int batchSize) {
this.queue = new SpscLinkedArrayQueue<T>(batchSize);
this.lock = new ReentrantLock();
Expand All @@ -49,9 +47,6 @@ public BlockingObservableIterator(int batchSize) {
@Override
public boolean hasNext() {
for (;;) {
if (cancelled) {
return false;
}
boolean d = done;
boolean empty = queue.isEmpty();
if (d) {
Expand All @@ -64,16 +59,19 @@ public boolean hasNext() {
}
}
if (empty) {
lock.lock();
try {
while (!cancelled && !done && queue.isEmpty()) {
condition.await();
lock.lock();
try {
while (!done && queue.isEmpty()) {
condition.await();
}
} finally {
lock.unlock();
}
} catch (InterruptedException ex) {
run();
DisposableHelper.dispose(this);
signalConsumer();
throw ExceptionHelper.wrapOrThrow(ex);
} finally {
lock.unlock();
}
} else {
return true;
Expand All @@ -84,15 +82,7 @@ public boolean hasNext() {
@Override
public T next() {
if (hasNext()) {
T v = queue.poll();

if (v == null) {
run();

throw new IllegalStateException("Queue empty?!");
}

return v;
return queue.poll();
}
throw new NoSuchElementException();
}
Expand All @@ -104,13 +94,8 @@ public void onSubscribe(Disposable s) {

@Override
public void onNext(T t) {
if (!queue.offer(t)) {
DisposableHelper.dispose(this);

onError(new IllegalStateException("Queue full?!"));
} else {
signalConsumer();
}
queue.offer(t);
signalConsumer();
}

@Override
Expand All @@ -135,12 +120,6 @@ void signalConsumer() {
}
}

@Override
public void run() {
DisposableHelper.dispose(this);
signalConsumer();
}

@Override // otherwise default method which isn't available in Java 7
public void remove() {
throw new UnsupportedOperationException("remove");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,41 +21,32 @@
import io.reactivex.Observable;
import io.reactivex.internal.util.ExceptionHelper;
import io.reactivex.observers.DisposableObserver;
import io.reactivex.plugins.RxJavaPlugins;

/**
* Wait for and iterate over the latest values of the source observable. If the source works faster than the
* iterator, values may be skipped, but not the {@code onError} or {@code onComplete} events.
* @param <T> the value type
*/
public enum BlockingObservableLatest {
;

/**
* Returns an {@code Iterable} that blocks until or unless the {@code Observable} emits an item that has not
* been returned by the {@code Iterable}, then returns that item.
*
* @param <T> the value type
* @param source
* the source {@code Observable}
* @return an {@code Iterable} that blocks until or unless the {@code Observable} emits an item that has not
* been returned by the {@code Iterable}, then returns that item
*/
public static <T> Iterable<T> latest(final ObservableSource<? extends T> source) {
return new Iterable<T>() {
@Override
public Iterator<T> iterator() {
LatestObserverIterator<T> lio = new LatestObserverIterator<T>();

@SuppressWarnings("unchecked")
Observable<Notification<T>> materialized = Observable.wrap((ObservableSource<T>)source).materialize();

materialized.subscribe(lio);
return lio;
}
};
public final class BlockingObservableLatest<T> implements Iterable<T> {

final ObservableSource<T> source;

public BlockingObservableLatest(ObservableSource<T> source) {
this.source = source;
}

@Override
public Iterator<T> iterator() {
BlockingObservableLatestIterator<T> lio = new BlockingObservableLatestIterator<T>();

Observable<Notification<T>> materialized = Observable.wrap(source).materialize();

materialized.subscribe(lio);
return lio;
}

/** Observer of source, iterator for output. */
static final class LatestObserverIterator<T> extends DisposableObserver<Notification<T>> implements Iterator<T> {
static final class BlockingObservableLatestIterator<T> extends DisposableObserver<Notification<T>> implements Iterator<T> {
// iterator's notification
Notification<T> iteratorNotification;

Expand All @@ -73,7 +64,7 @@ public void onNext(Notification<T> args) {

@Override
public void onError(Throwable e) {
// not expected
RxJavaPlugins.onError(e);
}

@Override
Expand All @@ -86,22 +77,20 @@ public boolean hasNext() {
if (iteratorNotification != null && iteratorNotification.isOnError()) {
throw ExceptionHelper.wrapOrThrow(iteratorNotification.getError());
}
if (iteratorNotification == null || iteratorNotification.isOnNext()) {
if (iteratorNotification == null) {
try {
notify.acquire();
} catch (InterruptedException ex) {
dispose();
Thread.currentThread().interrupt();
iteratorNotification = Notification.createOnError(ex);
throw ExceptionHelper.wrapOrThrow(ex);
}

Notification<T> n = value.getAndSet(null);
iteratorNotification = n;
if (n.isOnError()) {
throw ExceptionHelper.wrapOrThrow(n.getError());
}
if (iteratorNotification == null) {
try {
notify.acquire();
} catch (InterruptedException ex) {
dispose();
Thread.currentThread().interrupt();
iteratorNotification = Notification.createOnError(ex);
throw ExceptionHelper.wrapOrThrow(ex);
}

Notification<T> n = value.getAndSet(null);
iteratorNotification = n;
if (n.isOnError()) {
throw ExceptionHelper.wrapOrThrow(n.getError());
}
}
return iteratorNotification.isOnNext();
Expand All @@ -110,11 +99,9 @@ public boolean hasNext() {
@Override
public T next() {
if (hasNext()) {
if (iteratorNotification.isOnNext()) {
T v = iteratorNotification.getValue();
iteratorNotification = null;
return v;
}
T v = iteratorNotification.getValue();
iteratorNotification = null;
return v;
}
throw new NoSuchElementException();
}
Expand All @@ -123,6 +110,5 @@ public T next() {
public void remove() {
throw new UnsupportedOperationException("Read-only iterator.");
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,36 +25,31 @@
* seed value if no item has yet been emitted.
* <p>
* <img width="640" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.mostRecent.png" alt="">
*
* @param <T> the value type
*/
public enum BlockingObservableMostRecent {
;
/**
* Returns an {@code Iterable} that always returns the item most recently emitted by the {@code Observable}.
*
* @param <T> the value type
* @param source
* the source {@code Observable}
* @param initialValue
* a default item to return from the {@code Iterable} if {@code source} has not yet emitted any
* items
* @return an {@code Iterable} that always returns the item most recently emitted by {@code source}, or
* {@code initialValue} if {@code source} has not yet emitted any items
*/
public static <T> Iterable<T> mostRecent(final ObservableSource<? extends T> source, final T initialValue) {
return new Iterable<T>() {
@Override
public Iterator<T> iterator() {
MostRecentObserver<T> mostRecentObserver = new MostRecentObserver<T>(initialValue);
public final class BlockingObservableMostRecent<T> implements Iterable<T> {

/**
* Subscribe instead of unsafeSubscribe since this is the final subscribe in the chain
* since it is for BlockingObservable.
*/
source.subscribe(mostRecentObserver);
final ObservableSource<T> source;

final T initialValue;

public BlockingObservableMostRecent(ObservableSource<T> source, T initialValue) {
this.source = source;
this.initialValue = initialValue;
}

@Override
public Iterator<T> iterator() {
MostRecentObserver<T> mostRecentObserver = new MostRecentObserver<T>(initialValue);

/**
* Subscribe instead of unsafeSubscribe since this is the final subscribe in the chain
* since it is for BlockingObservable.
*/
source.subscribe(mostRecentObserver);

return mostRecentObserver.getIterable();
}
};
return mostRecentObserver.getIterable();
}

static final class MostRecentObserver<T> extends DefaultObserver<T> {
Expand Down
Loading