Skip to content

Conversation

@roeap
Copy link
Collaborator

@roeap roeap commented Mar 22, 2022

Description

This PR is part of the larger PR #523. Specifically an implementation of a record batch writer. The implementation is a composite of the json implementations from this crate and kafka-delta-ingest. The main addition is some logic to split a record batch into partitions according to the table partitioning.

Related Issue(s)

towards: #509

Documentation

Copy link
Member

@houqp houqp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @roeap for splitting up your PR into smaller ones :)

Looks good to me overall, one question I have is we already have a json writer in the deltalake::writer module, have you thought of how to better organize these two writer implementations in a more consistent namespace? I can see users getting confused between deltalake::writer and deltalake::write::writer modules.

arrow_schema: Arc<ArrowSchema>,
writer_properties: WriterProperties,
) -> Result<ArrowWriter<InMemoryWriteableCursor>, ParquetError> {
ArrowWriter::try_new(cursor, arrow_schema, Some(writer_properties))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like an overkill to create a function to just call another function with basically the same arguments?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

true - will get rid of it.

}

/// Writes the existing parquet bytes to storage and resets internal state to handle another file.
pub async fn flush(&mut self) -> Result<Vec<Add>, DeltaWriterError> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the semantic of flush here is different from the one in the existing json writer, is there any particular reason why you to keep it different?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not 100% sure which part you are referring to, but there are some things i considered :). The implementation with the json writer already creates a commit. Since we are hoping to eventually use this writer in a distributed context i though we should avoid doing so here. When its referring to the error type, these should probably be harmonized. In general it may be worth exploring to have a shared writer trait that different writers could implement?

Copy link
Member

@houqp houqp Mar 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I was referring to the flush method from the json writer:

pub async fn flush(&mut self) -> Result<(), DeltaTableError> {
.

Since we are hoping to eventually use this writer in a distributed context i though we should avoid doing so here.

I think this makes sense, if so, then it's better to use a different name other than flush to avoid confusions.

In general it may be worth exploring to have a shared writer trait that different writers could implement?

I haven't put much thought into this yet, maybe think about what are the common interfaces we want to share between writers? From a quick glance, the writer method might not be a good fit because it takes different type of input depending on the input format that needs to be supported.

UPDATE: I think the flush and flush_and_commit proposed by you could be a common interface we want to enforce to all format specific writers.

@houqp houqp requested review from mosyp and xianwill March 25, 2022 04:09
@roeap
Copy link
Collaborator Author

roeap commented Mar 25, 2022

@houqp - I did spend some time thinking on how to consolidate these namespaces, since having two namespaces with essentially the same name seems like a bad idea. Not only the namespace, but there is also a bunch of logic shared between the two writer implementations. In order to consolidate this we could maybe move the json writer and record batch writer into the same namespace and expose the same API via trait / generic. My guess would be, that the most prominent change would be to use the partitioning logic from the RB writer also in the json case. I was a bit hesitant to do this directly for being unsure about the performance implications. For larger data sizes, the sorting operations used in the RB writer could be fairly expensive, while the partitioning in the json writer seems quite clean and straight froward to me.

Maybe one way to have both functionalities is to add an additional method flush_and_commit to both writers?

In any case, I am more then happy to try and consolidate these within this PR based on your suggestions.

@houqp
Copy link
Member

houqp commented Mar 28, 2022

Not only the namespace, but there is also a bunch of logic shared between the two writer implementations. In order to consolidate this we could maybe move the json writer and record batch writer into the same namespace and expose the same API via trait / generic.

I agree this would be the ideal approach 👍

My guess would be, that the most prominent change would be to use the partitioning logic from the RB writer also in the json case. I was a bit hesitant to do this directly for being unsure about the performance implications.

I recommend we leave the duplication and optimization work as follow ups so we can focus on coming up with the right abstraction and interface in this PR. Sometimes a little bit of code duplication is not a bad thing :)

Maybe one way to have both functionalities is to add an additional method flush_and_commit to both writers?

That's a good idea to keep both writers consistent too. And we can leave the flush method to only write the parquet files and return the actions. Curious if @mosyp and @xianwill have any option on this.

@roeap
Copy link
Collaborator Author

roeap commented Mar 28, 2022

@houqp - When trying to consolidate the writers a bit, I stumbled across another I believe significant difference in how the writers are designed. The json writer uses the add_file method on the DeltaTransaction to write files. The json writer from kafka-delta-ingest as well the RecordBatchWriter make use of the storage backend directly. The RecordBatchWriter is generally modelled more against the the json implementation from the kafka crate. To make the writer APIs more harmonized i think the writers doing the writing the the only feasible alternative, since in the envisioned usage within datafusion having the transaction available in all writers seems quite cumbersome, if not prohibitive.

As such I am unsure if we would want to keep the add_file method on the transaction implementation - to me this might get a bit confusing. Additionally the add_file method does not seem to compute file statistics, I pulled that logic as well from the kafka crate... Just wanted to make sure before going ahead and removing it.

@houqp
Copy link
Member

houqp commented Mar 29, 2022

@roeap I think what you proposed makes total sense 👍 The add_file abstraction was probably added when we don't have a good writer abstraction. It's a good idea to fully decouple data file IO and table log metadata IO.

@roeap
Copy link
Collaborator Author

roeap commented Apr 1, 2022

@houqp - After looking a bit into harmonizing the implementations, it turned out that the required adjustments ran much deeper than hoped. So I went for an easier path and migrated the Json implementation from kafka-delta-ingest. Since the record batch implementation was also modelled against that implementation they were already somewhat similar. Still some cleaning up to do, but hope this is going the right way.

houqp
houqp previously approved these changes Apr 2, 2022
Copy link
Member

@houqp houqp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks @roeap !

@houqp
Copy link
Member

houqp commented Apr 2, 2022

leaving it open for the weekend in case others want to chime in :)

Copy link
Collaborator

@wjones127 wjones127 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like great progress towards a rust-based writer!

I noticed a few TODOs that look like something we might want to address before merging. I think it might also be worth adding some documentation to the writer as is.

use std::convert::TryFrom;
use std::sync::Arc;

impl TryFrom<Arc<ArrowSchema>> for Schema {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this implementation not work for the same use case?

impl TryFrom<&ArrowSchema> for schema::Schema {

Or, at the very least, should this impl be moved there?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense. Moved to the arrow file, and simplified it using existing impl.

partition_columns: Vec<String>,
values: &RecordBatch,
) -> Result<Vec<PartitionResult>, DeltaWriterError> {
// TODO remove panics within closures
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want to finish this TODO before merging?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got rid of all unwraps..

Co-authored-by: Will Jones <willjones127@gmail.com>
wjones127
wjones127 previously approved these changes Apr 3, 2022
houqp
houqp previously approved these changes Apr 3, 2022
Copy link
Member

@houqp houqp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM with one minor nitpick

@roeap roeap dismissed stale reviews from houqp and wjones127 via 170d887 April 3, 2022 06:41
@houqp houqp merged commit 8c5ed46 into delta-io:main Apr 3, 2022
@houqp
Copy link
Member

houqp commented Apr 3, 2022

Thanks @roeap ! This is one big milestone :D

@roeap roeap deleted the record-batch-writer branch April 4, 2022 05:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants