Skip to content

Commit 60dbbed

Browse files
davidmotenakarnokd
authored andcommitted
OperatorReduce - prevent multiple terminal events (#4246)
1 parent 689b22d commit 60dbbed

File tree

3 files changed

+294
-140
lines changed

3 files changed

+294
-140
lines changed

src/main/java/rx/internal/operators/OnSubscribeReduce.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import rx.Observable.OnSubscribe;
2323
import rx.exceptions.Exceptions;
2424
import rx.functions.Func2;
25+
import rx.plugins.RxJavaHooks;
2526

2627
public final class OnSubscribeReduce<T> implements OnSubscribe<T> {
2728

@@ -57,6 +58,8 @@ static final class ReduceSubscriber<T> extends Subscriber<T> {
5758

5859
static final Object EMPTY = new Object();
5960

61+
boolean done;
62+
6063
@SuppressWarnings("unchecked")
6164
public ReduceSubscriber(Subscriber<? super T> actual, Func2<T, T, T> reducer) {
6265
this.actual = actual;
@@ -68,6 +71,9 @@ public ReduceSubscriber(Subscriber<? super T> actual, Func2<T, T, T> reducer) {
6871
@SuppressWarnings("unchecked")
6972
@Override
7073
public void onNext(T t) {
74+
if (done) {
75+
return;
76+
}
7177
Object o = value;
7278
if (o == EMPTY) {
7379
value = t;
@@ -77,19 +83,28 @@ public void onNext(T t) {
7783
} catch (Throwable ex) {
7884
Exceptions.throwIfFatal(ex);
7985
unsubscribe();
80-
actual.onError(ex);
86+
onError(ex);
8187
}
8288
}
8389
}
8490

8591
@Override
8692
public void onError(Throwable e) {
87-
actual.onError(e);
93+
if (!done) {
94+
done = true;
95+
actual.onError(e);
96+
} else {
97+
RxJavaHooks.onError(e);
98+
}
8899
}
89100

90101
@SuppressWarnings("unchecked")
91102
@Override
92103
public void onCompleted() {
104+
if (done) {
105+
return;
106+
}
107+
done = true;
93108
Object o = value;
94109
if (o != EMPTY) {
95110
actual.onNext((T)o);
Lines changed: 277 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,277 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package rx.internal.operators;
18+
19+
import static org.junit.Assert.assertEquals;
20+
import static org.mockito.Matchers.any;
21+
import static org.mockito.Mockito.never;
22+
import static org.mockito.Mockito.times;
23+
import static org.mockito.Mockito.verify;
24+
25+
import java.util.Arrays;
26+
import java.util.List;
27+
import java.util.NoSuchElementException;
28+
import java.util.concurrent.CopyOnWriteArrayList;
29+
30+
import org.junit.Before;
31+
import org.junit.Test;
32+
import org.mockito.Mock;
33+
import org.mockito.MockitoAnnotations;
34+
35+
import rx.*;
36+
import rx.Observable.OnSubscribe;
37+
import rx.Observer;
38+
import rx.exceptions.TestException;
39+
import rx.functions.Action1;
40+
import rx.functions.Func1;
41+
import rx.functions.Func2;
42+
import rx.internal.util.UtilityFunctions;
43+
import rx.observers.TestSubscriber;
44+
import rx.plugins.RxJavaHooks;
45+
46+
public class OnSubscribeReduceTest {
47+
@Mock
48+
Observer<Object> observer;
49+
50+
@Before
51+
public void before() {
52+
MockitoAnnotations.initMocks(this);
53+
}
54+
55+
Func2<Integer, Integer, Integer> sum = new Func2<Integer, Integer, Integer>() {
56+
@Override
57+
public Integer call(Integer t1, Integer t2) {
58+
return t1 + t2;
59+
}
60+
};
61+
62+
@Test
63+
public void testAggregateAsIntSum() {
64+
65+
Observable<Integer> result = Observable.just(1, 2, 3, 4, 5).reduce(0, sum).map(UtilityFunctions.<Integer> identity());
66+
67+
result.subscribe(observer);
68+
69+
verify(observer).onNext(1 + 2 + 3 + 4 + 5);
70+
verify(observer).onCompleted();
71+
verify(observer, never()).onError(any(Throwable.class));
72+
}
73+
74+
@Test
75+
public void testAggregateAsIntSumSourceThrows() {
76+
Observable<Integer> result = Observable.concat(Observable.just(1, 2, 3, 4, 5),
77+
Observable.<Integer> error(new TestException()))
78+
.reduce(0, sum).map(UtilityFunctions.<Integer> identity());
79+
80+
result.subscribe(observer);
81+
82+
verify(observer, never()).onNext(any());
83+
verify(observer, never()).onCompleted();
84+
verify(observer, times(1)).onError(any(TestException.class));
85+
}
86+
87+
@Test
88+
public void testAggregateAsIntSumAccumulatorThrows() {
89+
Func2<Integer, Integer, Integer> sumErr = new Func2<Integer, Integer, Integer>() {
90+
@Override
91+
public Integer call(Integer t1, Integer t2) {
92+
throw new TestException();
93+
}
94+
};
95+
96+
Observable<Integer> result = Observable.just(1, 2, 3, 4, 5)
97+
.reduce(0, sumErr).map(UtilityFunctions.<Integer> identity());
98+
99+
result.subscribe(observer);
100+
101+
verify(observer, never()).onNext(any());
102+
verify(observer, never()).onCompleted();
103+
verify(observer, times(1)).onError(any(TestException.class));
104+
}
105+
106+
@Test
107+
public void testAggregateAsIntSumResultSelectorThrows() {
108+
109+
Func1<Integer, Integer> error = new Func1<Integer, Integer>() {
110+
111+
@Override
112+
public Integer call(Integer t1) {
113+
throw new TestException();
114+
}
115+
};
116+
117+
Observable<Integer> result = Observable.just(1, 2, 3, 4, 5)
118+
.reduce(0, sum).map(error);
119+
120+
result.subscribe(observer);
121+
122+
verify(observer, never()).onNext(any());
123+
verify(observer, never()).onCompleted();
124+
verify(observer, times(1)).onError(any(TestException.class));
125+
}
126+
127+
@Test
128+
public void testBackpressureWithNoInitialValue() throws InterruptedException {
129+
Observable<Integer> source = Observable.just(1, 2, 3, 4, 5, 6);
130+
Observable<Integer> reduced = source.reduce(sum);
131+
132+
Integer r = reduced.toBlocking().first();
133+
assertEquals(21, r.intValue());
134+
}
135+
136+
@Test
137+
public void testBackpressureWithInitialValue() throws InterruptedException {
138+
Observable<Integer> source = Observable.just(1, 2, 3, 4, 5, 6);
139+
Observable<Integer> reduced = source.reduce(0, sum);
140+
141+
Integer r = reduced.toBlocking().first();
142+
assertEquals(21, r.intValue());
143+
}
144+
145+
@Test
146+
public void testNoInitialValueDoesNotEmitMultipleTerminalEvents() {
147+
TestSubscriber<Integer> ts = TestSubscriber.create();
148+
Observable.create(new OnSubscribe<Integer>() {
149+
150+
@Override
151+
public void call(final Subscriber<? super Integer> sub) {
152+
sub.setProducer(new Producer() {
153+
154+
@Override
155+
public void request(long n) {
156+
if (n > 0) {
157+
sub.onNext(1);
158+
sub.onNext(2);
159+
sub.onCompleted();
160+
}
161+
}
162+
});
163+
}
164+
})
165+
.reduce(new Func2<Integer, Integer, Integer>() {
166+
167+
@Override
168+
public Integer call(Integer a, Integer b) {
169+
throw new RuntimeException("boo");
170+
}})
171+
.unsafeSubscribe(ts);
172+
ts.assertError(RuntimeException.class);
173+
ts.assertNotCompleted();
174+
}
175+
176+
@Test
177+
public void testNoInitialValueUpstreamEmitsMoreOnNextDespiteUnsubscription() {
178+
TestSubscriber<Integer> ts = TestSubscriber.create();
179+
Observable.create(new OnSubscribe<Integer>() {
180+
181+
@Override
182+
public void call(final Subscriber<? super Integer> sub) {
183+
sub.setProducer(new Producer() {
184+
185+
@Override
186+
public void request(long n) {
187+
if (n > 2) {
188+
sub.onNext(1);
189+
sub.onNext(2);
190+
sub.onNext(3);
191+
sub.onCompleted();
192+
}
193+
}
194+
});
195+
}
196+
})
197+
.reduce(new Func2<Integer, Integer, Integer>() {
198+
boolean once = true;
199+
200+
@Override
201+
public Integer call(Integer a, Integer b) {
202+
if (once) {
203+
throw new RuntimeException("boo");
204+
} else {
205+
once = false;
206+
return a + b;
207+
}
208+
}})
209+
.unsafeSubscribe(ts);
210+
ts.assertNoValues();
211+
ts.assertError(RuntimeException.class);
212+
ts.assertNotCompleted();
213+
}
214+
215+
@Test
216+
public void testNoInitialValueDoesNotEmitMultipleErrorEventsAndReportsSecondErrorToHooks() {
217+
try {
218+
final List<Throwable> list = new CopyOnWriteArrayList<Throwable>();
219+
RxJavaHooks.setOnError(new Action1<Throwable>() {
220+
221+
@Override
222+
public void call(Throwable t) {
223+
list.add(t);
224+
}
225+
});
226+
TestSubscriber<Integer> ts = TestSubscriber.create();
227+
final RuntimeException e1 = new RuntimeException("e1");
228+
final Throwable e2 = new RuntimeException("e2");
229+
Observable.create(new OnSubscribe<Integer>() {
230+
231+
@Override
232+
public void call(final Subscriber<? super Integer> sub) {
233+
sub.setProducer(new Producer() {
234+
235+
@Override
236+
public void request(long n) {
237+
if (n > 1) {
238+
sub.onNext(1);
239+
sub.onNext(2);
240+
sub.onError(e2);
241+
}
242+
}
243+
});
244+
}
245+
})
246+
.reduce(new Func2<Integer, Integer, Integer>() {
247+
248+
@Override
249+
public Integer call(Integer a, Integer b) {
250+
throw e1;
251+
}})
252+
.unsafeSubscribe(ts);
253+
ts.assertNotCompleted();
254+
System.out.println(ts.getOnErrorEvents());
255+
assertEquals(Arrays.asList(e1), ts.getOnErrorEvents());
256+
assertEquals(Arrays.asList(e2), list);
257+
} finally {
258+
RxJavaHooks.setOnError(null);
259+
}
260+
}
261+
262+
263+
@Test
264+
public void testNoInitialValueEmitsNoSuchElementExceptionIfEmptyStream() {
265+
TestSubscriber<Integer> ts = TestSubscriber.create();
266+
Observable.<Integer>empty().reduce(new Func2<Integer, Integer, Integer>() {
267+
268+
@Override
269+
public Integer call(Integer a, Integer b) {
270+
return a + b;
271+
}
272+
}).subscribe(ts);
273+
ts.assertError(NoSuchElementException.class);
274+
}
275+
276+
277+
}

0 commit comments

Comments
 (0)