Skip to content

Commit d28ce97

Browse files
committed
Merge pull request #8 from davidmoten/unsub-check
add check for unsubscribed before next read
2 parents 1585c40 + 9d7159e commit d28ce97

File tree

2 files changed

+20
-1
lines changed

2 files changed

+20
-1
lines changed

src/main/java/rx/observables/StringObservable.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,8 @@ public void call(Subscriber<? super String> o) {
176176
n = i.read(buffer);
177177
while (n != -1 && !o.isUnsubscribed()) {
178178
o.onNext(new String(buffer, 0, n));
179-
n = i.read(buffer);
179+
if (!o.isUnsubscribed())
180+
n = i.read(buffer);
180181
}
181182
} catch (IOException e) {
182183
o.onError(e);

src/test/java/rx/observables/StringObservableTest.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import java.io.ByteArrayInputStream;
3939
import java.io.FilterReader;
4040
import java.io.IOException;
41+
import java.io.InputStreamReader;
4142
import java.io.Reader;
4243
import java.io.StringReader;
4344
import java.nio.charset.Charset;
@@ -303,6 +304,23 @@ public void testFromReader() {
303304
assertNotSame(inStr, outStr);
304305
assertEquals(inStr, outStr);
305306
}
307+
308+
@Test
309+
public void testFromReaderWillUnsubscribeBeforeCallingNextRead() {
310+
final byte[] inBytes = "test".getBytes();
311+
final AtomicInteger numReads = new AtomicInteger(0);
312+
ByteArrayInputStream is = new ByteArrayInputStream(inBytes) {
313+
314+
@Override
315+
public synchronized int read(byte[] b, int off, int len) {
316+
numReads.incrementAndGet();
317+
return super.read(b, off, len);
318+
}
319+
};
320+
StringObservable.from(new InputStreamReader(is)).first().toBlocking()
321+
.single();
322+
assertEquals(1, numReads.get());
323+
}
306324

307325
@Test
308326
public void testByLine() {

0 commit comments

Comments
 (0)