-
Couldn't load subscription status.
- Fork 537
Record Batch Writer #573
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Record Batch Writer #573
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @roeap for splitting up your PR into smaller ones :)
Looks good to me overall, one question I have is we already have a json writer in the deltalake::writer module, have you thought of how to better organize these two writer implementations in a more consistent namespace? I can see users getting confused between deltalake::writer and deltalake::write::writer modules.
rust/src/write/writer.rs
Outdated
| arrow_schema: Arc<ArrowSchema>, | ||
| writer_properties: WriterProperties, | ||
| ) -> Result<ArrowWriter<InMemoryWriteableCursor>, ParquetError> { | ||
| ArrowWriter::try_new(cursor, arrow_schema, Some(writer_properties)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like an overkill to create a function to just call another function with basically the same arguments?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
true - will get rid of it.
rust/src/write/writer.rs
Outdated
| } | ||
|
|
||
| /// Writes the existing parquet bytes to storage and resets internal state to handle another file. | ||
| pub async fn flush(&mut self) -> Result<Vec<Add>, DeltaWriterError> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the semantic of flush here is different from the one in the existing json writer, is there any particular reason why you to keep it different?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not 100% sure which part you are referring to, but there are some things i considered :). The implementation with the json writer already creates a commit. Since we are hoping to eventually use this writer in a distributed context i though we should avoid doing so here. When its referring to the error type, these should probably be harmonized. In general it may be worth exploring to have a shared writer trait that different writers could implement?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I was referring to the flush method from the json writer:
Line 94 in 346f51a
| pub async fn flush(&mut self) -> Result<(), DeltaTableError> { |
Since we are hoping to eventually use this writer in a distributed context i though we should avoid doing so here.
I think this makes sense, if so, then it's better to use a different name other than flush to avoid confusions.
In general it may be worth exploring to have a shared writer trait that different writers could implement?
I haven't put much thought into this yet, maybe think about what are the common interfaces we want to share between writers? From a quick glance, the writer method might not be a good fit because it takes different type of input depending on the input format that needs to be supported.
UPDATE: I think the flush and flush_and_commit proposed by you could be a common interface we want to enforce to all format specific writers.
|
@houqp - I did spend some time thinking on how to consolidate these namespaces, since having two namespaces with essentially the same name seems like a bad idea. Not only the namespace, but there is also a bunch of logic shared between the two writer implementations. In order to consolidate this we could maybe move the json writer and record batch writer into the same namespace and expose the same API via trait / generic. My guess would be, that the most prominent change would be to use the partitioning logic from the RB writer also in the json case. I was a bit hesitant to do this directly for being unsure about the performance implications. For larger data sizes, the sorting operations used in the RB writer could be fairly expensive, while the partitioning in the json writer seems quite clean and straight froward to me. Maybe one way to have both functionalities is to add an additional method In any case, I am more then happy to try and consolidate these within this PR based on your suggestions. |
I agree this would be the ideal approach 👍
I recommend we leave the duplication and optimization work as follow ups so we can focus on coming up with the right abstraction and interface in this PR. Sometimes a little bit of code duplication is not a bad thing :)
That's a good idea to keep both writers consistent too. And we can leave the |
|
@houqp - When trying to consolidate the writers a bit, I stumbled across another I believe significant difference in how the writers are designed. The json writer uses the As such I am unsure if we would want to keep the |
|
@roeap I think what you proposed makes total sense 👍 The add_file abstraction was probably added when we don't have a good writer abstraction. It's a good idea to fully decouple data file IO and table log metadata IO. |
|
@houqp - After looking a bit into harmonizing the implementations, it turned out that the required adjustments ran much deeper than hoped. So I went for an easier path and migrated the Json implementation from kafka-delta-ingest. Since the record batch implementation was also modelled against that implementation they were already somewhat similar. Still some cleaning up to do, but hope this is going the right way. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks @roeap !
|
leaving it open for the weekend in case others want to chime in :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like great progress towards a rust-based writer!
I noticed a few TODOs that look like something we might want to address before merging. I think it might also be worth adding some documentation to the writer as is.
rust/src/writer/mod.rs
Outdated
| use std::convert::TryFrom; | ||
| use std::sync::Arc; | ||
|
|
||
| impl TryFrom<Arc<ArrowSchema>> for Schema { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this implementation not work for the same use case?
delta-rs/rust/src/delta_arrow.rs
Line 166 in 0b99151
| impl TryFrom<&ArrowSchema> for schema::Schema { |
Or, at the very least, should this impl be moved there?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense. Moved to the arrow file, and simplified it using existing impl.
rust/src/writer/record_batch.rs
Outdated
| partition_columns: Vec<String>, | ||
| values: &RecordBatch, | ||
| ) -> Result<Vec<PartitionResult>, DeltaWriterError> { | ||
| // TODO remove panics within closures |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you want to finish this TODO before merging?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
got rid of all unwraps..
Co-authored-by: Will Jones <willjones127@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM with one minor nitpick
|
Thanks @roeap ! This is one big milestone :D |
Description
This PR is part of the larger PR #523. Specifically an implementation of a record batch writer. The implementation is a composite of the json implementations from this crate and
kafka-delta-ingest. The main addition is some logic to split a record batch into partitions according to the table partitioning.Related Issue(s)
towards: #509
Documentation