Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make the BatchSerializer behind Arc to avoid unnecessary struct creation #8666

Merged
merged 6 commits into from
Dec 29, 2023
Merged

Make the BatchSerializer behind Arc to avoid unnecessary struct creation #8666

merged 6 commits into from
Dec 29, 2023

Conversation

metesynnada
Copy link
Contributor

@metesynnada metesynnada commented Dec 28, 2023

Which issue does this PR close?

Closes #.

Rationale for this change

Currently, the serializer is re-created for each RecordBatch, which degrades the performance while dealing with small batch sizes.

The duplicate() method is called here

https://github.com/apache/arrow-datafusion/blob/1737d49185e9e37c15aa432342604ee559a1069d/datafusion/core/src/datasource/file_format/write/orchestration.rs#L51-L82

where it is defined as (for CSV it is used for header, for JSON it is just a deep clone.)

https://github.com/apache/arrow-datafusion/blob/1737d49185e9e37c15aa432342604ee559a1069d/datafusion/core/src/datasource/file_format/csv.rs#L432-L450

Also, this makes the internal buffer useless since it is re-created for each batch in this setup.

What changes are included in this PR?

  • Renamed the BatchSerializer.
  • Make the trait methods take immutable references.
  • Make the type SerializerType = Arc<dyn SerializationSchema>
  • Handle the making CSV header false for the batches after the first batch.

Are these changes tested?

Existing tests.

Are there any user-facing changes?

No.

@github-actions github-actions bot added the core Core DataFusion crate label Dec 28, 2023
@ozankabak
Copy link
Contributor

@tustvold would appreciate it if you can take a look

Copy link
Contributor

@ozankabak ozankabak left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM but having one more reviewer would be good to make sure we are not losing any performance due to allocation-related reasons

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @metesynnada -- I think this PR looks like a good step forward to me. The only thing I think should be answered is why the trait is renamed. I also left some other suggestions but I don't think any are necessary.

@devinjdangelo I think you wrote a non trivial chunk of this code -- do you have any thoughts on this PR?

datafusion/core/src/datasource/file_format/write/mod.rs Outdated Show resolved Hide resolved
@@ -149,15 +145,14 @@ impl<W: AsyncWrite + Unpin + Send> AsyncWrite for AbortableWrite<W> {

/// A trait that defines the methods required for a RecordBatch serializer.
#[async_trait]
pub trait BatchSerializer: Unpin + Send {
pub trait SerializationSchema: Sync + Send {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the rationale for renaming this trait? It doesn't seem directly related to a Schema 🤔 I think the original name BatchSerializer better matches what the trait does

datafusion/core/src/datasource/file_format/write/mod.rs Outdated Show resolved Hide resolved
@@ -171,9 +164,9 @@ pub(crate) async fn stateless_serialize_and_write_files(
// this thread, so we cannot clean it up (hence any_abort_errors is true)
any_errors = true;
any_abort_errors = true;
triggering_error = Some(DataFusionError::Internal(format!(
triggering_error = Some(internal_datafusion_err!(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 nice cleanup

@alamb
Copy link
Contributor

alamb commented Dec 28, 2023

LGTM but having one more reviewer would be good to make sure we are not losing any performance due to allocation-related reasons

Since the previous version of the code was cloning the serializer anyway and the API returns Bytes (a read only structure) I don't think this PR is doing more allocations than before. Arguably it is better in that it is clearer now how allocations are performed

@ozankabak
Copy link
Contributor

Thanks for reviewing @alamb. Your suggestions make sense and we will apply them. I will consult with @metesynnada about the name choice. Maybe he has a good reason (that I can not think of right now) why he thinks the name should change. If not, we will stick with the old name.

@ozankabak
Copy link
Contributor

I just talked to @metesynnada and the names in this PR are simply following Flink's naming convention. I reverted to the old names for the scope of this PR, we may take up naming in the future as a separate topic of discussion.

@ozankabak ozankabak merged commit b85a397 into apache:main Dec 29, 2023
17 of 22 checks passed
@devinjdangelo
Copy link
Contributor

Sorry I'm late to the discussion, but yes this LGTM too. I agree that BatchSerializer makes more sense to me as a name.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants