Skip to content

Commit a902d4a

Browse files
authored
1 parent 9b91d4e commit a902d4a

File tree

4 files changed

+460
-1
lines changed

4 files changed

+460
-1
lines changed

src/main/java/io/reactivex/Completable.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -971,6 +971,27 @@ public final Throwable blockingGet(long timeout, TimeUnit unit) {
971971
return observer.blockingGetError(timeout, unit);
972972
}
973973

974+
/**
975+
* Subscribes to this Completable only once, when the first CompletableObserver
976+
* subscribes to the result Completable, caches its terminal event
977+
* and relays/replays it to observers.
978+
* <p>
979+
* Note that this operator doesn't allow disposing the connection
980+
* of the upstream source.
981+
* <dl>
982+
* <dt><b>Scheduler:</b></dt>
983+
* <dd>{@code cache} does not operate by default on a particular {@link Scheduler}.</dd>
984+
* </dl>
985+
* @return the new Completable instance
986+
* @since 2.0.4 - experimental
987+
*/
988+
@CheckReturnValue
989+
@SchedulerSupport(SchedulerSupport.NONE)
990+
@Experimental
991+
public final Completable cache() {
992+
return RxJavaPlugins.onAssembly(new CompletableCache(this));
993+
}
994+
974995
/**
975996
* Calls the given transformer function with this instance and returns the function's resulting
976997
* Completable.
Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.completable;
15+
16+
import java.util.concurrent.atomic.*;
17+
18+
import io.reactivex.*;
19+
import io.reactivex.annotations.Experimental;
20+
import io.reactivex.disposables.Disposable;
21+
22+
/**
23+
* Consume the upstream source exactly once and cache its terminal event.
24+
*
25+
* @since 2.0.4 - experimental
26+
*/
27+
@Experimental
28+
public final class CompletableCache extends Completable implements CompletableObserver {
29+
30+
static final InnerCompletableCache[] EMPTY = new InnerCompletableCache[0];
31+
32+
static final InnerCompletableCache[] TERMINATED = new InnerCompletableCache[0];
33+
34+
final CompletableSource source;
35+
36+
final AtomicReference<InnerCompletableCache[]> observers;
37+
38+
final AtomicBoolean once;
39+
40+
Throwable error;
41+
42+
public CompletableCache(CompletableSource source) {
43+
this.source = source;
44+
this.observers = new AtomicReference<InnerCompletableCache[]>(EMPTY);
45+
this.once = new AtomicBoolean();
46+
}
47+
48+
@Override
49+
protected void subscribeActual(CompletableObserver s) {
50+
InnerCompletableCache inner = new InnerCompletableCache(s);
51+
s.onSubscribe(inner);
52+
53+
if (add(inner)) {
54+
if (inner.isDisposed()) {
55+
remove(inner);
56+
}
57+
58+
if (once.compareAndSet(false, true)) {
59+
source.subscribe(this);
60+
}
61+
} else {
62+
Throwable ex = error;
63+
if (ex != null) {
64+
s.onError(ex);
65+
} else {
66+
s.onComplete();
67+
}
68+
}
69+
}
70+
71+
@Override
72+
public void onSubscribe(Disposable d) {
73+
// not used
74+
}
75+
76+
@Override
77+
public void onError(Throwable e) {
78+
error = e;
79+
for (InnerCompletableCache inner : observers.getAndSet(TERMINATED)) {
80+
if (!inner.get()) {
81+
inner.actual.onError(e);
82+
}
83+
}
84+
}
85+
86+
@Override
87+
public void onComplete() {
88+
for (InnerCompletableCache inner : observers.getAndSet(TERMINATED)) {
89+
if (!inner.get()) {
90+
inner.actual.onComplete();
91+
}
92+
}
93+
}
94+
95+
boolean add(InnerCompletableCache inner) {
96+
for (;;) {
97+
InnerCompletableCache[] a = observers.get();
98+
if (a == TERMINATED) {
99+
return false;
100+
}
101+
int n = a.length;
102+
InnerCompletableCache[] b = new InnerCompletableCache[n + 1];
103+
System.arraycopy(a, 0, b, 0, n);
104+
b[n] = inner;
105+
if (observers.compareAndSet(a, b)) {
106+
return true;
107+
}
108+
}
109+
}
110+
111+
void remove(InnerCompletableCache inner) {
112+
for (;;) {
113+
InnerCompletableCache[] a = observers.get();
114+
int n = a.length;
115+
if (n == 0) {
116+
return;
117+
}
118+
119+
int j = -1;
120+
121+
for (int i = 0; i < n; i++) {
122+
if (a[i] == inner) {
123+
j = i;
124+
break;
125+
}
126+
}
127+
128+
if (j < 0) {
129+
return;
130+
}
131+
132+
InnerCompletableCache[] b;
133+
134+
if (n == 1) {
135+
b = EMPTY;
136+
} else {
137+
b = new InnerCompletableCache[n - 1];
138+
System.arraycopy(a, 0, b, 0, j);
139+
System.arraycopy(a, j + 1, b, j, n - j - 1);
140+
}
141+
142+
if (observers.compareAndSet(a, b)) {
143+
break;
144+
}
145+
}
146+
}
147+
148+
final class InnerCompletableCache
149+
extends AtomicBoolean
150+
implements Disposable {
151+
152+
private static final long serialVersionUID = 8943152917179642732L;
153+
154+
final CompletableObserver actual;
155+
156+
InnerCompletableCache(CompletableObserver actual) {
157+
this.actual = actual;
158+
}
159+
160+
@Override
161+
public boolean isDisposed() {
162+
return get();
163+
}
164+
165+
@Override
166+
public void dispose() {
167+
if (compareAndSet(false, true)) {
168+
remove(this);
169+
}
170+
}
171+
}
172+
}

src/test/java/io/reactivex/JavadocWording.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -662,7 +662,7 @@ public void completableDocRefersToCompletableTypes() throws Exception {
662662
&& !m.signature.contains("TestObserver")) {
663663

664664
if (idx < 11 || !m.javadoc.substring(idx - 11, idx + 8).equals("CompletableObserver")) {
665-
e.append("java.lang.RuntimeException: Maybe doc mentions Observer but not using Observable\r\n at io.reactivex.")
665+
e.append("java.lang.RuntimeException: Completable doc mentions Observer but not using Observable\r\n at io.reactivex.")
666666
.append("Completable (Completable.java:").append(m.javadocLine + lineNumber(m.javadoc, idx) - 1).append(")\r\n\r\n");
667667
}
668668
}

0 commit comments

Comments
 (0)