Skip to content

Reading an InputStream directly line by line #30

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from

Conversation

Crystark
Copy link

I added an OnSubscribe to deal with reading lines while the InputStream is being written to (e.g. for a large download). I hope this is useful.

@akarnokd
Copy link
Member

/cc @davidmoten

@abersnaze
Copy link
Contributor

How is it better than byLine(decode(from(input), "UTF-8"))? if you are just looking for convenience maybe just wrapping that line of code be good enough. I would like to avoid adding custom OnSubscribeInputStreamToLines code that needs to be tested and maintained.

@davidmoten
Copy link
Contributor

I agree with @abersnaze on this one. And I suppose an alternative is split(from(new BufferedReader(input, "UTF-8"), "\n"). A benchmark might tell us which to use.

Incidentally split and decode don't support backpressure. Would be a nice improvement. RxJavaString has been so immobile for a long time that I've implemented those things with backpressure in my own libraries but would be nice to see that functionality here under proper supervision.

@abersnaze
Copy link
Contributor

@davidmoten if you got the code what are you waiting for. Submit a PR!

@Crystark
Copy link
Author

@abersnaze @davidmoten

byLine(decode(from(input), "UTF-8")) was the first thing I tried:

    @Test
    public void testDownload() throws InterruptedException {
        Strign uri = ...

        Observable
            .defer(() -> {
                try {
                    InputStream is = getAwsInputStream(uri);
                    if (is != null) {
                        return StringObservable.byLine(StringObservable.decode(StringObservable.from(is), "UTF-8"));
                    }
                } catch (Throwable t) {
                    return Observable.error(t);
                }
                return Observable.empty();
            })
            .doOnSubscribe(() -> L.warn("doOnSubscribe"))
            .doOnNext((t) -> L.warn("doOnNext - " + t))
            .subscribe();
    }

This prints out doOnSubscribe then hangs and i see my network active, the file being downloaded.

However if i use OnSubscribeInputStreamToLines

    @Test
    public void testDownload() throws InterruptedException {
        Strign uri = ...

        Observable
            .defer(() -> {
                try {
                    InputStream is = getAwsInputStream(uri);
                    if (is != null) {
                        return Observable.create(new OnSubscribeInputStreamToLines(is));
                    }
                } catch (Throwable t) {
                    return Observable.error(t);
                }
                return Observable.empty();
            })
            .doOnSubscribe(() -> L.warn("doOnSubscribe"))
            .doOnNext((t) -> L.warn("doOnNext - " + t))
            .subscribe();
    }

I see my doOnNext being printed as the file is being downloaded. Maybe I just missed something ?

@Crystark
Copy link
Author

I just tested with the following that works.

StringObservable.split(StringObservable.from(new BufferedReader(new InputStreamReader(is, Charsets.UTF_8))), "\n")

So i retried removing byLine and replacing it by

StringObservable.split(StringObservable.decode(StringObservable.from(is), "UTF-8"), "\n");

And that works. The problem was just with the line separator actually. So yeah, it wasn't working because byLine doesn't handle all kinds of line separator (CR, LF, and CRLF). In my OnSubscribe I was using readLine which handles that:

Reads a line of text. A line is considered to be terminated by any one of a line feed ('\n'), a carriage return ('\r'), or a carriage return followed immediately by a linefeed.

I'll close that PR. Maybe is it worth opening an issue about byLine ? And maybe also and issue to tackle the backpressure on decode and split as @davidmoten suggested ? For now i'll use my OnSubscribe as it works fine and has backpressure support.

@Crystark Crystark closed this Sep 16, 2015
@abersnaze
Copy link
Contributor

strange the byLine() is literally this.

public static Observable<String> byLine(Observable<String> source) {
    return split(source, System.getProperty("line.separator"));
}

and this code confirms System.getProperty("line.separator") and "\n" (on my machine) are the same.

System.out.println("System.getProperty(\"line.separator\") = " + Arrays.toString(System.getProperty("line.separator").getBytes()));
System.out.println("\"\\n\" = " + Arrays.toString("\n".getBytes()));

outputs this

System.getProperty("line.separator") = [10]
"\n" = [10]

@Crystark Could you report what middle snippet of code outputs on your system?

@Crystark
Copy link
Author

I'm running my tests on windows so yeah, line separator is CRLF:

System.getProperty("line.separator") = [13, 10]
"\n" = [10]
"\r" = [13]

IMO, the System's line separator should be ignored and byLine should be able to split any kind of line separator, whatever the system.

@abersnaze
Copy link
Contributor

@Crystark check of this #31 to see if works for you.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants