Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

table.createWriteStream() #60

Closed
marshall007 opened this issue Feb 3, 2015 · 16 comments
Closed

table.createWriteStream() #60

marshall007 opened this issue Feb 3, 2015 · 16 comments

Comments

@marshall007
Copy link
Contributor

Now that we have support for readable streams in 1.16 it would be nice if writable streams were implemented as well. I'm not sure what the API for this should look like given that the readable streams use the stream:true optarg, but this is what I'm looking to do:

var parser = require('JSONStream').parse([true])
  , fs = require('fs')
  , r = require('rethinkdbdash')();

var file = fs.createReadStream('data.json');
var table = r.table('test').createWriteStream({ batch: 200 });

file.pipe(parser).pipe(table);

In this example we're using JSONStream to parse a large JSON file and pipe the results straight into RethinkDB in batches of 200.

@neumino
Copy link
Owner

neumino commented Feb 3, 2015

It's probably a bit tricky to properly implement, but it would be pretty neat. The questions to answer are:

  • do we have to insert in sequential batches?
  • what happen to the stream in case of errors?
  • should we use multiple connections like the import tool

I'll try to see if there's a plan for the official driver to provide something similar or if there's a plan to keep a connection open and just dump document in a table without sending a query.

@thelinuxlich
Copy link

+1 this would be very nice

@marshall007
Copy link
Contributor Author

@neumino I have a branch up that I was playing around with if you want to check it out: marshall007/rethinkdbdash#write-stream, but I need to update to 1.16 before I can run the tests so I have no idea if any of that works yet.

As far as your questions go:

  • that probably depends on whether we use multiple connections or not, right? Otherwise I think that's inherently true given how streams work.
  • I think we should just emit an error event and just let the user close the stream if they want.
  • I like the idea of using multiple connections, it would be interesting to see if we can out-do the python implementation in terms of throughput.

@neumino
Copy link
Owner

neumino commented Feb 5, 2015

I have a prototype in the branch next.
It's slightly different in the sense that it will buffer things while a query is happening, but even if the cache does not reach the highWaterMark value, it will still flush things as soon as it can.

I want to test that a bit more before releasing it though, but from my local tests, it works fine.

@marshall007
Copy link
Contributor Author

@neumino I quite like your implementation, looking forward to using it! I did notice a couple potential issues though:

  1. Calling .toWritableStream() on anything other than r.table(...) should throw. I don't see anything preventing you from calling .toWritableStream() on an arbitrary query, but maybe I'm missing something.
  2. this._pendingCallback can be overwritten by subsequent calls to WritableStream::_write meaning that the done callback is not necessarily called on each write. I don't know if this matters in practice because I'm not all that familiar with streams, but it seems fishy.

@neumino
Copy link
Owner

neumino commented Feb 6, 2015

@marshall007, yep I'll make toWritableStream throw if called on something else than a table.

About _pendingCallback there were some issues with ending the write stream. I think I have a fix, currently testing it now :)

@marshall007
Copy link
Contributor Author

Also, in WritableStream we should pass the options argument through to r.table(...).insert(..., options) specifically because you probably want conflict:replace|update in a lot of cases.

@neumino
Copy link
Owner

neumino commented Feb 6, 2015

Thanks @marshall007!

@neumino
Copy link
Owner

neumino commented Feb 6, 2015

So one remaining question is what to call the methods.
There's currently

  • toStream([connection][, options])
  • toWritableStream([connection][, options])
  • toTransformStream([connection][, options])

It's kind of verbose-ish methods. I'm leaning towards toStream([connection][, options]) where you can pass {writable: true} or {transform: true} in the options (by default it's {readable: true} (you can also pass other options like highWaterMark etc.

The fs module has createReadStream and createWriteStream, but it's not that good I think, and createTransformStream sounds weird.
We could also go with stream instead of toStream?

@marshall007
Copy link
Contributor Author

I don't have strong opinions on this, but I like the unified toStream({writable: true}) interface and I would prefer that over simply stream(...) so that we're consistent with other terms like toJSON, toEpochTime, etc.

I don't really understand what your intentions are with TransformStream, could you provide an example use case? I think it would be really useful if the transform optarg took a callback function which would allow the user to apply arbitrary transformations without having to fully implement stream.Transform themselves.

@neumino
Copy link
Owner

neumino commented Feb 7, 2015

@marshall007 -- The TransformStream is just a WritableStream. it just outputs what you inserted (with the generated primary key), or in case of an update, the updated document.

@neumino
Copy link
Owner

neumino commented Feb 8, 2015

Released in 1.16.5.

This requires node >= 0.12. I'm not super happy with the way Node fill the buffer, and especially how it can drain it. It's not as optimal as I would like to be.
I could make it the stream flow optimal but I had to sacrifice the "end" event, so I eventually roll backed this change.

I'll probably try to send a PR to node itself.

The final syntax is toStream()/toStream({writable: true})/toStream({transform: true}).
Feedback is welcome :)

Extra note: Thanks @marshall007 for following up so far and providing valuable feedback! It helped a lot!

@neumino neumino closed this as completed Feb 8, 2015
@marshall007
Copy link
Contributor Author

I think the issue you're describing is related to joyent/node #7348 where the writable stream emits the finish event before you've had a chance to flush the last chunk to the database, correct? I ran into the same thing when working on my implementation.

I came across TomFrost/FlushWritable after digging through the related issues in node. It may be worthwhile to look at his workaround if you haven't already.

Regardless, nice job and I'm glad to see this implemented!

@neumino
Copy link
Owner

neumino commented Feb 10, 2015

Thanks for the issue on the node repo.

Now that I think about it, I could provide a better implementation for Transform (in case of an efficient Readable being piped, I should be able to always do batch insert of highWaterMark documents).
I can then have a toStream({writable: true}) be implemented as a Transform stream piped to a Writable stream.

Thanks again @marshall007 for the useful feedback!

@neumino
Copy link
Owner

neumino commented Feb 11, 2015

I cannot internally pipe things, so I could make only Transform streams more efficient.

Sorry for the multiple updates. It took a few tries, but I think this is a good implementation of a single-connection writable streams.

@marshall007 -- if you drop by in the bay area, ping me, I'll treat you a beer :)

@marshall007
Copy link
Contributor Author

@neumino haha, will do!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants