-
Notifications
You must be signed in to change notification settings - Fork 1.8k
feat: Support serde for JsonSource PhysicalPlan #15311
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
take |
alamb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @westhide
Once CI passes I think this PR looks good to me, but I do think we should consider serializing the other field too
| .with_file_compression_type(FileCompressionType::UNCOMPRESSED); | ||
| Ok(conf.build()) | ||
| } | ||
| PhysicalPlanType::JsonScan(scan) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like there is one more relevant field for JsonSource, batch_size
datafusion/datafusion/datasource-json/src/source.rs
Lines 251 to 257 in 6d5e00a
| /// JsonSource holds the extra configuration that is necessary for [`JsonOpener`] | |
| #[derive(Clone, Default)] | |
| pub struct JsonSource { | |
| batch_size: Option<usize>, | |
| metrics: ExecutionPlanMetricsSet, | |
| projected_statistics: Option<Statistics>, | |
| } |
Perhaps we can add that field to the serialization as well (or file a ticket to add it?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently batch_size will set by context.session_config().batch_size() when DataSource call open(..), serialization batch_size will let the JsonSource use client config instead of the Executor session_config in Ballista. Should we change this behavior for all file format(csv,parquert,avro...)?
| .with_batch_size(context.session_config().batch_size()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Client config should be propagated from to executor (task context) so it should be the same.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it, I read the code about TaskDefinition in ballista. Thx~
| let ctx = SessionContext::new(); | ||
| ctx.register_json("t1", "../core/tests/data/1.json", Default::default()) | ||
| .await?; | ||
| let plan = ctx.table("t1").await?.create_physical_plan().await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
|
Thanks @westhide, PR makes sense to me. I left one comment in the original ballista issue, apache/datafusion-ballista#1209 (comment) If you have time please add test in ballista |
milenkovicm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with @alamb, adding batch_size should be straight forward
|
Thanks @westhide For anyone following along: |
Which issue does this PR close?
Rationale for this change
Fix datafusion-ballista:
Unsupported NdJsonExec plan and extension codecExceptionWhat changes are included in this PR?
Support serde for JsonSource PhysicalPlan
Are these changes tested?
Unit Test pass✅.
Ballista integration Test✅
Are there any user-facing changes?
No