Skip to content

Commit 5199a24

Browse files
committed
use AbstractOnSubscribe for StringObservable.from(InputStream) and from(Reader)
1 parent d28ce97 commit 5199a24

File tree

5 files changed

+181
-105
lines changed

5 files changed

+181
-105
lines changed

build.gradle

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
buildscript {
22
repositories { jcenter() }
3-
dependencies { classpath 'com.netflix.nebula:gradle-rxjava-project-plugin:2.+' }
3+
dependencies { classpath 'com.netflix.nebula:gradle-rxjava-project-plugin:2.2.3' }
44
}
55

66
apply plugin: 'rxjava-project'
77
apply plugin: 'java'
88

99
dependencies {
10-
compile 'io.reactivex:rxjava:1.0.+'
10+
compile 'io.reactivex:rxjava:1.0.9'
1111
testCompile 'junit:junit-dep:4.10'
1212
testCompile 'org.mockito:mockito-core:1.8.5'
1313
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package rx.internal.operators;
2+
3+
import java.io.IOException;
4+
import java.io.InputStream;
5+
import java.util.Arrays;
6+
7+
import rx.Subscriber;
8+
import rx.observables.AbstractOnSubscribe;
9+
10+
public final class OnSubscribeInputStream extends AbstractOnSubscribe<byte[], InputStream> {
11+
12+
private final InputStream is;
13+
private final int size;
14+
15+
public OnSubscribeInputStream(InputStream is, int size) {
16+
this.is = is;
17+
this.size = size;
18+
}
19+
20+
@Override
21+
protected InputStream onSubscribe(Subscriber<? super byte[]> subscriber) {
22+
return is;
23+
}
24+
25+
@Override
26+
protected void next(SubscriptionState<byte[], InputStream> state) {
27+
28+
InputStream is = state.state();
29+
byte[] buffer = new byte[size];
30+
try {
31+
int count = is.read(buffer);
32+
if (count == -1)
33+
state.onCompleted();
34+
else if (count < size)
35+
state.onNext(Arrays.copyOf(buffer, count));
36+
else
37+
state.onNext(buffer);
38+
} catch (IOException e) {
39+
state.onError(e);
40+
}
41+
}
42+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package rx.internal.operators;
2+
3+
import java.io.IOException;
4+
import java.io.Reader;
5+
6+
import rx.Subscriber;
7+
import rx.observables.AbstractOnSubscribe;
8+
9+
public final class OnSubscribeReader extends AbstractOnSubscribe<String, Reader> {
10+
11+
private final Reader reader;
12+
private final int size;
13+
14+
public OnSubscribeReader(Reader reader, int size) {
15+
this.reader = reader;
16+
this.size = size;
17+
}
18+
19+
@Override
20+
protected Reader onSubscribe(Subscriber<? super String> subscriber) {
21+
return reader;
22+
}
23+
24+
@Override
25+
protected void next(SubscriptionState<String, Reader> state) {
26+
27+
Reader reader = state.state();
28+
char[] buffer = new char[size];
29+
try {
30+
int count = reader.read(buffer);
31+
if (count == -1)
32+
state.onCompleted();
33+
else
34+
state.onNext(String.valueOf(buffer, 0, count));
35+
} catch (IOException e) {
36+
state.onError(e);
37+
}
38+
}
39+
}

src/main/java/rx/observables/StringObservable.java

Lines changed: 23 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,14 @@
1616
package rx.observables;
1717

1818
import rx.Observable;
19-
import rx.Observable.OnSubscribe;
2019
import rx.Observable.Operator;
2120
import rx.Subscriber;
2221
import rx.functions.Action1;
2322
import rx.functions.Func0;
2423
import rx.functions.Func1;
2524
import rx.functions.Func2;
25+
import rx.internal.operators.OnSubscribeInputStream;
26+
import rx.internal.operators.OnSubscribeReader;
2627

2728
import java.io.Closeable;
2829
import java.io.IOException;
@@ -42,8 +43,8 @@
4243

4344
public class StringObservable {
4445
/**
45-
* Reads from the bytes from a source {@link InputStream} and outputs {@link Observable} of
46-
* {@code byte[]}s
46+
* Reads bytes from a source {@link InputStream} and outputs {@link Observable} of
47+
* {@code byte[]}s. Supports backpressure.
4748
* <p>
4849
* <img width="640" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/St.from.png" alt="">
4950
*
@@ -103,44 +104,24 @@ public void call(S resource) {
103104
}
104105

105106
/**
106-
* Reads from the bytes from a source {@link InputStream} and outputs {@link Observable} of
107-
* {@code byte[]}s
107+
* Reads bytes from a source {@link InputStream} and outputs {@link Observable} of
108+
* {@code byte[]}s. Supports backpressure.
108109
* <p>
109110
* <img width="640" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/St.from.png" alt="">
110111
*
111-
* @param i
112+
* @param is
112113
* Source {@link InputStream}
113114
* @param size
114115
* internal buffer size
115116
* @return the Observable containing read byte arrays from the input
116117
*/
117-
public static Observable<byte[]> from(final InputStream i, final int size) {
118-
return Observable.create(new OnSubscribe<byte[]>() {
119-
@Override
120-
public void call(Subscriber<? super byte[]> o) {
121-
byte[] buffer = new byte[size];
122-
try {
123-
if (o.isUnsubscribed())
124-
return;
125-
int n = i.read(buffer);
126-
while (n != -1 && !o.isUnsubscribed()) {
127-
o.onNext(Arrays.copyOf(buffer, n));
128-
if (!o.isUnsubscribed())
129-
n = i.read(buffer);
130-
}
131-
} catch (IOException e) {
132-
o.onError(e);
133-
}
134-
if (o.isUnsubscribed())
135-
return;
136-
o.onCompleted();
137-
}
138-
});
118+
public static Observable<byte[]> from(final InputStream is, final int size) {
119+
return Observable.create(new OnSubscribeInputStream(is, size));
139120
}
140121

141122
/**
142-
* Reads from the characters from a source {@link Reader} and outputs {@link Observable} of
143-
* {@link String}s
123+
* Reads characters from a source {@link Reader} and outputs {@link Observable} of
124+
* {@link String}s. Supports backpressure.
144125
* <p>
145126
* <img width="640" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/St.from.png" alt="">
146127
*
@@ -153,8 +134,8 @@ public static Observable<String> from(final Reader i) {
153134
}
154135

155136
/**
156-
* Reads from the characters from a source {@link Reader} and outputs {@link Observable} of
157-
* {@link String}s
137+
* Reads characters from a source {@link Reader} and outputs {@link Observable} of
138+
* {@link String}s. Supports backpressure.
158139
* <p>
159140
* <img width="640" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/St.from.png" alt="">
160141
*
@@ -164,33 +145,12 @@ public static Observable<String> from(final Reader i) {
164145
* internal buffer size
165146
* @return the Observable of Strings read from the source
166147
*/
167-
public static Observable<String> from(final Reader i, final int size) {
168-
return Observable.create(new OnSubscribe<String>() {
169-
@Override
170-
public void call(Subscriber<? super String> o) {
171-
char[] buffer = new char[size];
172-
try {
173-
if (o.isUnsubscribed())
174-
return;
175-
int n = 0;
176-
n = i.read(buffer);
177-
while (n != -1 && !o.isUnsubscribed()) {
178-
o.onNext(new String(buffer, 0, n));
179-
if (!o.isUnsubscribed())
180-
n = i.read(buffer);
181-
}
182-
} catch (IOException e) {
183-
o.onError(e);
184-
}
185-
if (o.isUnsubscribed())
186-
return;
187-
o.onCompleted();
188-
}
189-
});
148+
public static Observable<String> from(final Reader reader, final int size) {
149+
return Observable.create(new OnSubscribeReader(reader, size));
190150
}
191151

192152
/**
193-
* Decodes a stream the multibyte chunks into a stream of strings that works on infinite streams
153+
* Decodes a stream of multibyte chunks into a stream of strings that works on infinite streams
194154
* and where handles when a multibyte character spans two chunks.
195155
* <p>
196156
* <img width="640" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/St.decode.png" alt="">
@@ -204,7 +164,7 @@ public static Observable<String> decode(Observable<byte[]> src, String charsetNa
204164
}
205165

206166
/**
207-
* Decodes a stream the multibyte chunks into a stream of strings that works on infinite streams
167+
* Decodes a stream of multibyte chunks into a stream of strings that works on infinite streams
208168
* and where handles when a multibyte character spans two chunks.
209169
* <p>
210170
* <img width="640" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/St.decode.png" alt="">
@@ -218,8 +178,8 @@ public static Observable<String> decode(Observable<byte[]> src, Charset charset)
218178
}
219179

220180
/**
221-
* Decodes a stream the multibyte chunks into a stream of strings that works on infinite streams
222-
* and where it handles when a multibyte character spans two chunks.
181+
* Decodes a stream of multibyte chunks into a stream of strings that works on infinite streams
182+
* and handles when a multibyte character spans two chunks.
223183
* This method allows for more control over how malformed and unmappable characters are handled.
224184
* <p>
225185
* <img width="640" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/St.decode.png" alt="">
@@ -311,7 +271,7 @@ public boolean process(byte[] next, ByteBuffer last, boolean endOfInput) {
311271
}
312272

313273
/**
314-
* Encodes a possible infinite stream of strings into a Observable of byte arrays.
274+
* Encodes a possibly infinite stream of strings into an Observable of byte arrays.
315275
* <p>
316276
* <img width="640" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/St.encode.png" alt="">
317277
*
@@ -324,7 +284,7 @@ public static Observable<byte[]> encode(Observable<String> src, String charsetNa
324284
}
325285

326286
/**
327-
* Encodes a possible infinite stream of strings into a Observable of byte arrays.
287+
* Encodes a possibly infinite stream of strings into an Observable of byte arrays.
328288
* <p>
329289
* <img width="640" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/St.encode.png" alt="">
330290
*
@@ -337,7 +297,7 @@ public static Observable<byte[]> encode(Observable<String> src, Charset charset)
337297
}
338298

339299
/**
340-
* Encodes a possible infinite stream of strings into a Observable of byte arrays.
300+
* Encodes a possibly infinite stream of strings into an Observable of byte arrays.
341301
* This method allows for more control over how malformed and unmappable characters are handled.
342302
* <p>
343303
* <img width="640" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/St.encode.png" alt="">
@@ -548,7 +508,7 @@ public static Observable<String> byLine(Observable<String> source) {
548508
}
549509

550510
/**
551-
* Converts an String into an Observable that emits the chars in the String.
511+
* Converts a String into an Observable that emits the chars in the String.
552512
* <p>
553513
* <img width="640" height="315" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/from.png" alt="">
554514
*

0 commit comments

Comments
 (0)