Skip to content

Commit ba85468

Browse files
Merge pull request ReactiveX#1950 from zsxwing/unsubscribed
Add "Subscriptions.unsubscribed" to fix the 'isUnsubscribed' issue
2 parents 162e042 + 35f2807 commit ba85468

13 files changed

+67
-26
lines changed

src/main/java/rx/Observable.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7321,7 +7321,7 @@ public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
73217321
// TODO why aren't we throwing the hook's return value.
73227322
throw r;
73237323
}
7324-
return Subscriptions.empty();
7324+
return Subscriptions.unsubscribed();
73257325
}
73267326
}
73277327

@@ -7410,7 +7410,7 @@ public final Subscription subscribe(Subscriber<? super T> subscriber) {
74107410
// TODO why aren't we throwing the hook's return value.
74117411
throw r;
74127412
}
7413-
return Subscriptions.empty();
7413+
return Subscriptions.unsubscribed();
74147414
}
74157415
}
74167416

src/main/java/rx/internal/operators/OperatorTimeoutWithSelector.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public Subscription call(
5151
} catch (Throwable t) {
5252
Exceptions.throwIfFatal(t);
5353
timeoutSubscriber.onError(t);
54-
return Subscriptions.empty();
54+
return Subscriptions.unsubscribed();
5555
}
5656
return o.unsafeSubscribe(new Subscriber<U>() {
5757

@@ -72,7 +72,7 @@ public void onNext(U t) {
7272

7373
});
7474
} else {
75-
return Subscriptions.empty();
75+
return Subscriptions.unsubscribed();
7676
}
7777
}
7878
}, new TimeoutStub<T>() {
@@ -87,7 +87,7 @@ public Subscription call(
8787
} catch (Throwable t) {
8888
Exceptions.throwIfFatal(t);
8989
timeoutSubscriber.onError(t);
90-
return Subscriptions.empty();
90+
return Subscriptions.unsubscribed();
9191
}
9292
return o.unsafeSubscribe(new Subscriber<V>() {
9393

src/main/java/rx/internal/schedulers/NewThreadWorker.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public Subscription schedule(final Action0 action) {
5858
@Override
5959
public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
6060
if (isUnsubscribed) {
61-
return Subscriptions.empty();
61+
return Subscriptions.unsubscribed();
6262
}
6363
return scheduleActual(action, delayTime, unit);
6464
}

src/main/java/rx/schedulers/CachedThreadScheduler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ public Subscription schedule(Action0 action) {
141141
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
142142
if (innerSubscription.isUnsubscribed()) {
143143
// don't schedule, we are unsubscribed
144-
return Subscriptions.empty();
144+
return Subscriptions.unsubscribed();
145145
}
146146

147147
ScheduledAction s = threadWorker.scheduleActual(action, delayTime, unit);

src/main/java/rx/schedulers/EventLoopsScheduler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ public Subscription schedule(Action0 action) {
9595
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
9696
if (innerSubscription.isUnsubscribed()) {
9797
// don't schedule, we are unsubscribed
98-
return Subscriptions.empty();
98+
return Subscriptions.unsubscribed();
9999
}
100100

101101
ScheduledAction s = poolWorker.scheduleActual(action, delayTime, unit);

src/main/java/rx/schedulers/ExecutorScheduler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ public ExecutorSchedulerWorker(Executor executor) {
7171
@Override
7272
public Subscription schedule(Action0 action) {
7373
if (isUnsubscribed()) {
74-
return Subscriptions.empty();
74+
return Subscriptions.unsubscribed();
7575
}
7676
ExecutorAction ea = new ExecutorAction(action, tasks);
7777
tasks.add(ea);
@@ -106,7 +106,7 @@ public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit
106106
return schedule(action);
107107
}
108108
if (isUnsubscribed()) {
109-
return Subscriptions.empty();
109+
return Subscriptions.unsubscribed();
110110
}
111111
ScheduledExecutorService service;
112112
if (executor instanceof ScheduledExecutorService) {

src/main/java/rx/schedulers/ImmediateScheduler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
5656
@Override
5757
public Subscription schedule(Action0 action) {
5858
action.call();
59-
return Subscriptions.empty();
59+
return Subscriptions.unsubscribed();
6060
}
6161

6262
@Override

src/main/java/rx/schedulers/TrampolineScheduler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
6868

6969
private Subscription enqueue(Action0 action, long execTime) {
7070
if (innerSubscription.isUnsubscribed()) {
71-
return Subscriptions.empty();
71+
return Subscriptions.unsubscribed();
7272
}
7373
final TimedAction timedAction = new TimedAction(action, execTime, COUNTER_UPDATER.incrementAndGet(TrampolineScheduler.this));
7474
queue.add(timedAction);
@@ -81,7 +81,7 @@ private Subscription enqueue(Action0 action, long execTime) {
8181
polled.action.call();
8282
}
8383
} while (wip.decrementAndGet() > 0);
84-
return Subscriptions.empty();
84+
return Subscriptions.unsubscribed();
8585
} else {
8686
// queue wasn't empty, a parent is already processing so we just add to the end of the queue
8787
return Subscriptions.create(new Action0() {

src/main/java/rx/subscriptions/MultipleAssignmentSubscription.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,8 @@
2525
* if unsubscribed.
2626
*/
2727
public final class MultipleAssignmentSubscription implements Subscription {
28-
/** The shared empty state. */
29-
static final State EMPTY_STATE = new State(false, Subscriptions.empty());
30-
volatile State state = EMPTY_STATE;
28+
29+
volatile State state = new State(false, Subscriptions.empty());
3130
static final AtomicReferenceFieldUpdater<MultipleAssignmentSubscription, State> STATE_UPDATER
3231
= AtomicReferenceFieldUpdater.newUpdater(MultipleAssignmentSubscription.class, State.class, "state");
3332

src/main/java/rx/subscriptions/RefCountSubscription.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ public Subscription get() {
8080
do {
8181
oldState = state;
8282
if (oldState.isUnsubscribed) {
83-
return Subscriptions.empty();
83+
return Subscriptions.unsubscribed();
8484
} else {
8585
newState = oldState.addChild();
8686
}

0 commit comments

Comments
 (0)