-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make the BatchSerializer behind Arc to avoid unnecessary struct creation #8666
Conversation
@tustvold would appreciate it if you can take a look |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM but having one more reviewer would be good to make sure we are not losing any performance due to allocation-related reasons
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @metesynnada -- I think this PR looks like a good step forward to me. The only thing I think should be answered is why the trait is renamed. I also left some other suggestions but I don't think any are necessary.
@devinjdangelo I think you wrote a non trivial chunk of this code -- do you have any thoughts on this PR?
@@ -149,15 +145,14 @@ impl<W: AsyncWrite + Unpin + Send> AsyncWrite for AbortableWrite<W> { | |||
|
|||
/// A trait that defines the methods required for a RecordBatch serializer. | |||
#[async_trait] | |||
pub trait BatchSerializer: Unpin + Send { | |||
pub trait SerializationSchema: Sync + Send { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the rationale for renaming this trait? It doesn't seem directly related to a Schema
🤔 I think the original name BatchSerializer
better matches what the trait does
datafusion/core/src/datasource/file_format/write/orchestration.rs
Outdated
Show resolved
Hide resolved
@@ -171,9 +164,9 @@ pub(crate) async fn stateless_serialize_and_write_files( | |||
// this thread, so we cannot clean it up (hence any_abort_errors is true) | |||
any_errors = true; | |||
any_abort_errors = true; | |||
triggering_error = Some(DataFusionError::Internal(format!( | |||
triggering_error = Some(internal_datafusion_err!( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 nice cleanup
Since the previous version of the code was |
Thanks for reviewing @alamb. Your suggestions make sense and we will apply them. I will consult with @metesynnada about the name choice. Maybe he has a good reason (that I can not think of right now) why he thinks the name should change. If not, we will stick with the old name. |
I just talked to @metesynnada and the names in this PR are simply following Flink's naming convention. I reverted to the old names for the scope of this PR, we may take up naming in the future as a separate topic of discussion. |
Sorry I'm late to the discussion, but yes this LGTM too. I agree that BatchSerializer makes more sense to me as a name. |
Which issue does this PR close?
Closes #.
Rationale for this change
Currently, the serializer is re-created for each RecordBatch, which degrades the performance while dealing with small batch sizes.
The
duplicate()
method is called herehttps://github.com/apache/arrow-datafusion/blob/1737d49185e9e37c15aa432342604ee559a1069d/datafusion/core/src/datasource/file_format/write/orchestration.rs#L51-L82
where it is defined as (for CSV it is used for header, for JSON it is just a deep clone.)
https://github.com/apache/arrow-datafusion/blob/1737d49185e9e37c15aa432342604ee559a1069d/datafusion/core/src/datasource/file_format/csv.rs#L432-L450
Also, this makes the internal buffer useless since it is re-created for each batch in this setup.
What changes are included in this PR?
BatchSerializer
.type SerializerType = Arc<dyn SerializationSchema>
Are these changes tested?
Existing tests.
Are there any user-facing changes?
No.