Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

onBackpressureBufferToFile review please #9

Open
davidmoten opened this issue Apr 22, 2016 · 7 comments
Open

onBackpressureBufferToFile review please #9

davidmoten opened this issue Apr 22, 2016 · 7 comments

Comments

@davidmoten
Copy link
Owner

davidmoten commented Apr 22, 2016

The ability to buffer streams to disk has been something that I've wondered about for a while.

Can I get peoples comments/review of this new operator please?

Transformers.onBackpressureBufferToFile

I'd love to get review of this new operator in terms of

  • what use cases have you got? (volume, serialized size, rate, platform, constraints)
  • overall approach
  • API
  • correctness of code (a big one because of the numerous sections of code subject to concurrency)
  • testing on different OS (currently just tested on linux)
  • performance (I've favoured correctness initially)
  • anything else you think of

The code is in the master branch and runtime jar is on Maven Central as described in rxjava-extras README.

A quick way of contributing is to run a long running test (~30 mins) on your machine:

git clone https://github.com/davidmoten/rxjava-extras.git
cd rxjava-extras
./test-long.sh 

On non-nix platform just run this command instead of test-long.sh:

mvn clean install -Dmax.small=100000000 -Dmax.medium=300000 -Dmax.seconds=600 -Dloops=10000
@davidmoten davidmoten changed the title onBackpressureBufferToFile review please onBackpressureBufferToFile review please Apr 22, 2016
@thomasnield
Copy link

I noticed this feature a few weeks ago and although I haven't played with it yet, it looks really fascinating.

Typically I use your RxJava-JDBC and structure my streams into controlled queried batches in some form. My company uses RDBMS for most data, but sometimes we do deal with really large text files. It's one thing to stream each line out of a text file, but if the data needs to be grouped up or require some aggregated context to perform calculations, that can quickly become problematic since you cannot query a text file. Sometimes I have to turn these text files into SQLite databases just so I can query them and not have to scan and import all the data.

I haven't look at this API much yet, but I'd be curious to see if it can scale those kinds of challenges.

@davidmoten
Copy link
Owner Author

Thanks for that @thomasnield, I'll be curious to hear if it's useful to you.

@davidmoten
Copy link
Owner Author

@thomasnield considering your use case I think you need more than serialized streams. You might profit from using serialized data structures that support fast access etc. You might want to look at MapDB. I actually went there first and tried using its file based queue in the implementation here. Worked but was slowish because had stronger guarantees that are needed in this new operator (here can leverage single producer-single consumer scenario for instance).

@thomasnield
Copy link

Hmm, interesting I'll check that out too then. Great work on everything though! Always thrilled to see what you come up with next...

@akarnokd
Copy link
Contributor

I've glanced through the operator and the SPSC queues and this is what I think:

  • the drain loop looks a bit odd,
    • no isUnsubscribed check in the request-fulfilling loop
    • a null poll also calls isEmpty in finished()
    • no need to change both requested and emitted
    • see observeOn for example
  • seems to go only unbounded-in with Long.MAX_VALUE. Depending on your design choices, you could go bounded and use the files as an off-memory temporary storage
  • what happens if both reader and writer are in the same file?
  • have you looked at Aeron's architecture?
    • they are using memory-mapped files
    • the SPSC version doesn't need locking
    • release happens by setting the size field (but requires Unsafe to issue an ordered write)
    • acquire happens by reading the size field and not seing 0 (Unsafe volatile read required)

@davidmoten
Copy link
Owner Author

Thanks very much @akarnokd, great to have your expertise looking at it.

no isUnsubscribed check in the request-fulfilling loop

Thanks, corrected.

a null poll also calls isEmpty in finished()

True (if done). finished() is reused when requests == 0 further down in the loop but I could customize the finished() path for those two cases to get more efficiency.

no need to change both requested and emitted

Thanks, corrected.

see observeOn for example

Yeah, this is sort of a combination of onBackpressureBuffer and observeOn without the batching. I did look at these operators while developing this one but I also masochistically enjoyed knocking it up myself. I should return to both of them to improve the drain.

seems to go only unbounded-in with Long.MAX_VALUE. Depending on your design choices, you could go bounded and use the files as an off-memory temporary storage

Yeah, I'll ponder this a bit further. My biggest concern with not using Long.MAX_VALUE is that some operator upstream starts queueing in memory and the memory-saving benefit of this operator is lost.

what happens if both reader and writer are in the same file?

Do you mean in terms of correctness or performance? Well it works fine and I have long running tests that just use one file but there is more contention in FileBasedSPSCQueue. Interestingly enough I suspect that pausing the reader at the start enough time to rollover could provide reduced CPU usage for high throughput long input stream scenarios because of less contention at synchronized blocks (and adaptive locking can kick in). The contention control in FileBasedSPSCQueue favours write over read (so it's hard for read to catch up to write unless write slows down a bit).

have you looked at Aeron's architecture?

I've been following the project a little bit but I haven't had a look at the internals for a while. I better go back there!

they are using memory-mapped files
the SPSC version doesn't need locking
release happens by setting the size field (but requires Unsafe to issue an ordered write)
acquire happens by reading the size field and not seing 0 (Unsafe volatile read required)

Fantastic, I'll have a look!

@davidmoten
Copy link
Owner Author

Aeron architecture description video:

http://www.infoq.com/presentations/aeron, 17:00 to 30:00, clearly explained

Looks good. I'll give it a try later.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants