Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,6 @@ message DmlNode{
INSERT_APPEND = 3;
INSERT_OVERWRITE = 4;
INSERT_REPLACE = 5;

}
Type dml_type = 1;
LogicalPlanNode input = 2;
Expand Down Expand Up @@ -726,6 +725,7 @@ message PhysicalPlanNode {
CsvSinkExecNode csv_sink = 28;
ParquetSinkExecNode parquet_sink = 29;
UnnestExecNode unnest = 30;
JsonScanExecNode json_scan = 31;
}
}

Expand Down Expand Up @@ -1024,6 +1024,10 @@ message CsvScanExecNode {
bool newlines_in_values = 7;
}

message JsonScanExecNode {
FileScanExecConf base_conf = 1;
}

message AvroScanExecNode {
FileScanExecConf base_conf = 1;
}
Expand Down
106 changes: 106 additions & 0 deletions datafusion/proto/src/generated/pbjson.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 8 additions & 1 deletion datafusion/proto/src/generated/prost.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

31 changes: 30 additions & 1 deletion datafusion/proto/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use datafusion::datasource::file_format::parquet::ParquetSink;
use datafusion::datasource::physical_plan::AvroSource;
#[cfg(feature = "parquet")]
use datafusion::datasource::physical_plan::ParquetSource;
use datafusion::datasource::physical_plan::{CsvSource, FileScanConfig};
use datafusion::datasource::physical_plan::{CsvSource, FileScanConfig, JsonSource};
use datafusion::datasource::source::DataSourceExec;
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::execution::FunctionRegistry;
Expand Down Expand Up @@ -247,6 +247,15 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
.with_file_compression_type(FileCompressionType::UNCOMPRESSED);
Ok(conf.build())
}
PhysicalPlanType::JsonScan(scan) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like there is one more relevant field for JsonSource, batch_size

/// JsonSource holds the extra configuration that is necessary for [`JsonOpener`]
#[derive(Clone, Default)]
pub struct JsonSource {
batch_size: Option<usize>,
metrics: ExecutionPlanMetricsSet,
projected_statistics: Option<Statistics>,
}

Perhaps we can add that field to the serialization as well (or file a ticket to add it?)

Copy link
Contributor Author

@westhide westhide Mar 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently batch_size will set by context.session_config().batch_size() when DataSource call open(..), serialization batch_size will let the JsonSource use client config instead of the Executor session_config in Ballista. Should we change this behavior for all file format(csv,parquert,avro...)?

.with_batch_size(context.session_config().batch_size())

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Client config should be propagated from to executor (task context) so it should be the same.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, I read the code about TaskDefinition in ballista. Thx~

let scan_conf = parse_protobuf_file_scan_config(
scan.base_conf.as_ref().unwrap(),
registry,
extension_codec,
Arc::new(JsonSource::new()),
)?;
Ok(scan_conf.build())
}
#[cfg_attr(not(feature = "parquet"), allow(unused_variables))]
PhysicalPlanType::ParquetScan(scan) => {
#[cfg(feature = "parquet")]
Expand Down Expand Up @@ -1684,6 +1693,26 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
}
}

if let Some(data_source_exec) = plan.downcast_ref::<DataSourceExec>() {
let data_source = data_source_exec.data_source();
if let Some(scan_conf) = data_source.as_any().downcast_ref::<FileScanConfig>()
{
let source = scan_conf.file_source();
if let Some(_json_source) = source.as_any().downcast_ref::<JsonSource>() {
return Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::JsonScan(
protobuf::JsonScanExecNode {
base_conf: Some(serialize_file_scan_config(
scan_conf,
extension_codec,
)?),
},
)),
});
}
}
}

#[cfg(feature = "parquet")]
if let Some(exec) = plan.downcast_ref::<DataSourceExec>() {
let data_source_exec = exec.data_source();
Expand Down
9 changes: 9 additions & 0 deletions datafusion/proto/tests/cases/roundtrip_physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1277,6 +1277,15 @@ fn roundtrip_analyze() -> Result<()> {
)))
}

#[tokio::test]
async fn roundtrip_json_source() -> Result<()> {
let ctx = SessionContext::new();
ctx.register_json("t1", "../core/tests/data/1.json", Default::default())
.await?;
let plan = ctx.table("t1").await?.create_physical_plan().await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

roundtrip_test(plan)
}

#[test]
fn roundtrip_json_sink() -> Result<()> {
let field_a = Field::new("plan_type", DataType::Utf8, false);
Expand Down
Loading