Skip to content

Commit e7eaede

Browse files
committed
Merge pull request #5 from lucaslabs/observable_from_string
Create an Observable that emits each char in the source String.
2 parents 623acd9 + 1c3d9e8 commit e7eaede

File tree

2 files changed

+47
-56
lines changed

2 files changed

+47
-56
lines changed

src/main/java/rx/observables/StringObservable.java

Lines changed: 35 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public class StringObservable {
5454
public static Observable<byte[]> from(final InputStream i) {
5555
return from(i, 8 * 1024);
5656
}
57-
57+
5858
/**
5959
* Func0 that allows throwing an {@link IOException}s commonly thrown during IO operations.
6060
* @see StringObservable#using(UnsafeFunc0, Func1)
@@ -534,50 +534,6 @@ public void onNext(String t) {
534534
});
535535
}
536536

537-
public final static class Line {
538-
private final int number;
539-
private final String text;
540-
541-
public Line(int number, String text) {
542-
this.number = number;
543-
this.text = text;
544-
}
545-
546-
public int getNumber() {
547-
return number;
548-
}
549-
550-
public String getText() {
551-
return text;
552-
}
553-
554-
@Override
555-
public int hashCode() {
556-
int result = 31 + number;
557-
result = 31 * result + (text == null ? 0 : text.hashCode());
558-
return result;
559-
}
560-
561-
@Override
562-
public boolean equals(Object obj) {
563-
if (!(obj instanceof Line))
564-
return false;
565-
Line other = (Line) obj;
566-
if (number != other.number)
567-
return false;
568-
if (other.text == text)
569-
return true;
570-
if (text == null)
571-
return false;
572-
return text.equals(other.text);
573-
}
574-
575-
@Override
576-
public String toString() {
577-
return number + ":" + text;
578-
}
579-
}
580-
581537
/**
582538
* Splits the {@link Observable} of Strings by lines and numbers them (zero based index)
583539
* <p>
@@ -586,13 +542,42 @@ public String toString() {
586542
* @param source
587543
* @return the Observable conaining the split lines of the source
588544
*/
589-
public static Observable<Line> byLine(Observable<String> source) {
590-
return split(source, System.getProperty("line.separator")).map(new Func1<String, Line>() {
591-
int lineNumber = 0;
545+
public static Observable<String> byLine(Observable<String> source) {
546+
return split(source, System.getProperty("line.separator"));
547+
}
592548

549+
/**
550+
* Converts an String into an Observable that emits the chars in the String.
551+
* <p>
552+
* <img width="640" height="315" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/from.png" alt="">
553+
*
554+
* @param str
555+
* the source String
556+
* @return an Observable that emits each char in the source String
557+
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Creating-Observables#from">RxJava wiki: from</a>
558+
*/
559+
public static Observable<String> byCharacter(Observable<String> source) {
560+
return source.lift(new Operator<String, String>() {
593561
@Override
594-
public Line call(String text) {
595-
return new Line(lineNumber++, text);
562+
public Subscriber<? super String> call(final Subscriber<? super String> subscriber) {
563+
return new Subscriber<String>(subscriber) {
564+
@Override
565+
public void onCompleted() {
566+
subscriber.onCompleted();
567+
}
568+
569+
@Override
570+
public void onError(Throwable e) {
571+
subscriber.onError(e);
572+
}
573+
574+
@Override
575+
public void onNext(String str) {
576+
for (char c : str.toCharArray()) {
577+
subscriber.onNext(Character.toString(c));
578+
}
579+
}
580+
};
596581
}
597582
});
598583
}

src/test/java/rx/observables/StringObservableTest.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import static org.mockito.Mockito.times;
2828
import static org.mockito.Mockito.verify;
2929
import static rx.observables.StringObservable.byLine;
30+
import static rx.observables.StringObservable.byCharacter;
3031
import static rx.observables.StringObservable.decode;
3132
import static rx.observables.StringObservable.encode;
3233
import static rx.observables.StringObservable.from;
@@ -52,7 +53,6 @@
5253
import rx.Observable;
5354
import rx.Observer;
5455
import rx.functions.Func1;
55-
import rx.observables.StringObservable.Line;
5656
import rx.observables.StringObservable.UnsafeFunc0;
5757
import rx.observers.TestObserver;
5858
import rx.observers.TestSubscriber;
@@ -295,7 +295,7 @@ public synchronized int read(byte[] b, int off, int len) {
295295
StringObservable.from(is).first().toBlocking().single();
296296
assertEquals(1, numReads.get());
297297
}
298-
298+
299299
@Test
300300
public void testFromReader() {
301301
final String inStr = "test";
@@ -308,10 +308,16 @@ public void testFromReader() {
308308
public void testByLine() {
309309
String newLine = System.getProperty("line.separator");
310310

311-
List<Line> lines = byLine(Observable.from(Arrays.asList("qwer", newLine + "asdf" + newLine, "zx", "cv")))
312-
.toList().toBlocking().single();
311+
List<String> lines = byLine(Observable.from(Arrays.asList("qwer", newLine + "asdf" + newLine, "zx", "cv"))).toList().toBlocking().single();
313312

314-
assertEquals(Arrays.asList(new Line(0, "qwer"), new Line(1, "asdf"), new Line(2, "zxcv")), lines);
313+
assertEquals(Arrays.asList("qwer", "asdf", "zxcv"), lines);
314+
}
315+
316+
@Test
317+
public void testByCharacter() {
318+
List<String> chars = byCharacter(Observable.from(Arrays.asList("foo", "bar"))).toList().toBlocking().single();
319+
320+
assertEquals(Arrays.asList("f", "o", "o", "b", "a", "r"), chars);
315321
}
316322

317323
@Test
@@ -347,7 +353,7 @@ public void testUsingCloseOnError() throws IOException {
347353
public int read(char[] cbuf) throws IOException {
348354
throw new IOException("boo");
349355
}
350-
356+
351357
@Override
352358
public void close() throws IOException {
353359
closed.set(true);

0 commit comments

Comments
 (0)