Skip to content

Create an Observable that emits each char in the source String. #5

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Nov 18, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 35 additions & 50 deletions src/main/java/rx/observables/StringObservable.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public class StringObservable {
public static Observable<byte[]> from(final InputStream i) {
return from(i, 8 * 1024);
}

/**
* Func0 that allows throwing an {@link IOException}s commonly thrown during IO operations.
* @see StringObservable#using(UnsafeFunc0, Func1)
Expand Down Expand Up @@ -532,50 +532,6 @@ public void onNext(String t) {
});
}

public final static class Line {
private final int number;
private final String text;

public Line(int number, String text) {
this.number = number;
this.text = text;
}

public int getNumber() {
return number;
}

public String getText() {
return text;
}

@Override
public int hashCode() {
int result = 31 + number;
result = 31 * result + (text == null ? 0 : text.hashCode());
return result;
}

@Override
public boolean equals(Object obj) {
if (!(obj instanceof Line))
return false;
Line other = (Line) obj;
if (number != other.number)
return false;
if (other.text == text)
return true;
if (text == null)
return false;
return text.equals(other.text);
}

@Override
public String toString() {
return number + ":" + text;
}
}

/**
* Splits the {@link Observable} of Strings by lines and numbers them (zero based index)
* <p>
Expand All @@ -584,13 +540,42 @@ public String toString() {
* @param source
* @return the Observable conaining the split lines of the source
*/
public static Observable<Line> byLine(Observable<String> source) {
return split(source, System.getProperty("line.separator")).map(new Func1<String, Line>() {
int lineNumber = 0;
public static Observable<String> byLine(Observable<String> source) {
return split(source, System.getProperty("line.separator"));
}

/**
* Converts an String into an Observable that emits the chars in the String.
* <p>
* <img width="640" height="315" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/from.png" alt="">
*
* @param str
* the source String
* @return an Observable that emits each char in the source String
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Creating-Observables#from">RxJava wiki: from</a>
*/
public static Observable<String> byCharacter(Observable<String> source) {
return source.lift(new Operator<String, String>() {
@Override
public Line call(String text) {
return new Line(lineNumber++, text);
public Subscriber<? super String> call(final Subscriber<? super String> subscriber) {
return new Subscriber<String>(subscriber) {
@Override
public void onCompleted() {
subscriber.onCompleted();
}

@Override
public void onError(Throwable e) {
subscriber.onError(e);
}

@Override
public void onNext(String str) {
for (char c : str.toCharArray()) {
subscriber.onNext(Character.toString(c));
}
}
};
}
});
}
Expand Down
18 changes: 12 additions & 6 deletions src/test/java/rx/observables/StringObservableTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static rx.observables.StringObservable.byLine;
import static rx.observables.StringObservable.byCharacter;
import static rx.observables.StringObservable.decode;
import static rx.observables.StringObservable.encode;
import static rx.observables.StringObservable.from;
Expand All @@ -52,7 +53,6 @@
import rx.Observable;
import rx.Observer;
import rx.functions.Func1;
import rx.observables.StringObservable.Line;
import rx.observables.StringObservable.UnsafeFunc0;
import rx.observers.TestObserver;
import rx.observers.TestSubscriber;
Expand Down Expand Up @@ -277,7 +277,7 @@ public synchronized int read(byte[] b, int off, int len) {
StringObservable.from(is).first().toBlocking().single();
assertEquals(1, numReads.get());
}

@Test
public void testFromReader() {
final String inStr = "test";
Expand All @@ -290,10 +290,16 @@ public void testFromReader() {
public void testByLine() {
String newLine = System.getProperty("line.separator");

List<Line> lines = byLine(Observable.from(Arrays.asList("qwer", newLine + "asdf" + newLine, "zx", "cv")))
.toList().toBlocking().single();
List<String> lines = byLine(Observable.from(Arrays.asList("qwer", newLine + "asdf" + newLine, "zx", "cv"))).toList().toBlocking().single();

assertEquals(Arrays.asList(new Line(0, "qwer"), new Line(1, "asdf"), new Line(2, "zxcv")), lines);
assertEquals(Arrays.asList("qwer", "asdf", "zxcv"), lines);
}

@Test
public void testByCharacter() {
List<String> chars = byCharacter(Observable.from(Arrays.asList("foo", "bar"))).toList().toBlocking().single();

assertEquals(Arrays.asList("f", "o", "o", "b", "a", "r"), chars);
}

@Test
Expand Down Expand Up @@ -329,7 +335,7 @@ public void testUsingCloseOnError() throws IOException {
public int read(char[] cbuf) throws IOException {
throw new IOException("boo");
}

@Override
public void close() throws IOException {
closed.set(true);
Expand Down