Skip to content

2.x: coverage and cleanup 10/12-1 #4696

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 12, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,6 @@ public void onSubscribe(Disposable d) {
}
return;
}
if (once.get() || set.isDisposed()) {
return;
}

// no need to have separate subscribers because inner is stateless
c.subscribe(inner);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@

import io.reactivex.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.MissingBackpressureException;
import io.reactivex.internal.disposables.SequentialDisposable;
import io.reactivex.internal.queue.SpscArrayQueue;
import io.reactivex.exceptions.*;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.internal.fuseable.*;
import io.reactivex.internal.queue.*;
import io.reactivex.internal.subscriptions.SubscriptionHelper;
import io.reactivex.plugins.RxJavaPlugins;

Expand All @@ -36,133 +37,218 @@ public CompletableConcat(Publisher<? extends CompletableSource> sources, int pre

@Override
public void subscribeActual(CompletableObserver s) {
CompletableConcatSubscriber parent = new CompletableConcatSubscriber(s, prefetch);
sources.subscribe(parent);
sources.subscribe(new CompletableConcatSubscriber(s, prefetch));
}

static final class CompletableConcatSubscriber
extends AtomicInteger
implements Subscriber<CompletableSource>, Disposable {
private static final long serialVersionUID = 7412667182931235013L;
private static final long serialVersionUID = 9032184911934499404L;

final CompletableObserver actual;

final int prefetch;
final SequentialDisposable sd;

final SpscArrayQueue<CompletableSource> queue;
final int limit;

final ConcatInnerObserver inner;

final AtomicBoolean once;

int sourceFused;

int consumed;

SimpleQueue<CompletableSource> queue;

Subscription s;

volatile boolean done;

final AtomicBoolean once = new AtomicBoolean();

final ConcatInnerObserver inner;
volatile boolean active;

CompletableConcatSubscriber(CompletableObserver actual, int prefetch) {
this.actual = actual;
this.prefetch = prefetch;
this.queue = new SpscArrayQueue<CompletableSource>(prefetch);
this.sd = new SequentialDisposable();
this.inner = new ConcatInnerObserver();
this.inner = new ConcatInnerObserver(this);
this.once = new AtomicBoolean();
this.limit = prefetch - (prefetch >> 2);
}

@Override
public void onSubscribe(Subscription s) {
if (SubscriptionHelper.validate(this.s, s)) {
this.s = s;

long r = prefetch == Integer.MAX_VALUE ? Long.MAX_VALUE : prefetch;

if (s instanceof QueueSubscription) {
@SuppressWarnings("unchecked")
QueueSubscription<CompletableSource> qs = (QueueSubscription<CompletableSource>) s;

int m = qs.requestFusion(QueueSubscription.ANY);

if (m == QueueSubscription.SYNC) {
sourceFused = m;
queue = qs;
done = true;
actual.onSubscribe(this);
drain();
return;
}
if (m == QueueSubscription.ASYNC) {
sourceFused = m;
queue = qs;
actual.onSubscribe(this);
s.request(r);
return;
}
}

if (prefetch == Integer.MAX_VALUE) {
queue = new SpscLinkedArrayQueue<CompletableSource>(Flowable.bufferSize());
} else {
queue = new SpscArrayQueue<CompletableSource>(prefetch);
}

actual.onSubscribe(this);
s.request(prefetch);

s.request(r);
}
}

@Override
public void onNext(CompletableSource t) {
if (!queue.offer(t)) {
onError(new MissingBackpressureException());
return;
}
if (getAndIncrement() == 0) {
next();
if (sourceFused == QueueSubscription.NONE) {
if (!queue.offer(t)) {
onError(new MissingBackpressureException());
return;
}
}
drain();
}

@Override
public void onError(Throwable t) {
if (once.compareAndSet(false, true)) {
DisposableHelper.dispose(inner);
actual.onError(t);
return;
} else {
RxJavaPlugins.onError(t);
}
done = true;
RxJavaPlugins.onError(t);
}

@Override
public void onComplete() {
if (done) {
return;
}
done = true;
if (getAndIncrement() == 0) {
next();
}
}

void innerError(Throwable e) {
s.cancel();
onError(e);
}

void innerComplete() {
if (decrementAndGet() != 0) {
next();
}
if (!done) {
s.request(1);
}
drain();
}

@Override
public void dispose() {
s.cancel();
sd.dispose();
DisposableHelper.dispose(inner);
}

@Override
public boolean isDisposed() {
return sd.isDisposed();
return DisposableHelper.isDisposed(inner.get());
}

void next() {
boolean d = done;
CompletableSource c = queue.poll();
if (c == null) {
if (d) {
if (once.compareAndSet(false, true)) {
actual.onComplete();
}
void drain() {
if (getAndIncrement() != 0) {
return;
}

for (;;) {
if (isDisposed()) {
return;
}
RxJavaPlugins.onError(new IllegalStateException("Queue is empty?!"));
return;

if (!active) {

boolean d = done;

CompletableSource cs;

try {
cs = queue.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
innerError(ex);
return;
}

boolean empty = cs == null;

if (d && empty) {
if (once.compareAndSet(false, true)) {
actual.onComplete();
}
return;
}

if (!empty) {
active = true;
cs.subscribe(inner);
request();
}
}

if (decrementAndGet() == 0) {
break;
}
}
}

void request() {
if (sourceFused != QueueSubscription.SYNC) {
int p = consumed + 1;
if (p == limit) {
consumed = 0;
s.request(p);
} else {
consumed = p;
}
}
}

void innerError(Throwable e) {
if (once.compareAndSet(false, true)) {
s.cancel();
actual.onError(e);
} else {
RxJavaPlugins.onError(e);
}
}

c.subscribe(inner);
void innerComplete() {
active = false;
drain();
}

final class ConcatInnerObserver implements CompletableObserver {
static final class ConcatInnerObserver extends AtomicReference<Disposable> implements CompletableObserver {
private static final long serialVersionUID = -5454794857847146511L;

final CompletableConcatSubscriber parent;

ConcatInnerObserver(CompletableConcatSubscriber parent) {
this.parent = parent;
}

@Override
public void onSubscribe(Disposable d) {
sd.update(d);
DisposableHelper.replace(this, d);
}

@Override
public void onError(Throwable e) {
innerError(e);
parent.innerError(e);
}

@Override
public void onComplete() {
innerComplete();
parent.innerComplete();
}
}
}
Expand Down
Loading