Skip to content

Commit 22309f9

Browse files
authored
feat: change datafusion-proto to use TaskContext rather thanSessionContext for physical plan serialization (#17601)
* change session context to task context in physical proto ... * fix compilation issue * remove `RuntimeEnv` from few function arguments * update upgrading guide
1 parent 1a6c79b commit 22309f9

File tree

12 files changed

+255
-295
lines changed

12 files changed

+255
-295
lines changed

benchmarks/src/imdb/run.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -570,7 +570,7 @@ mod tests {
570570
let plan = ctx.sql(&query).await?;
571571
let plan = plan.create_physical_plan().await?;
572572
let bytes = physical_plan_to_bytes(plan.clone())?;
573-
let plan2 = physical_plan_from_bytes(&bytes, &ctx)?;
573+
let plan2 = physical_plan_from_bytes(&bytes, &ctx.task_ctx())?;
574574
let plan_formatted = format!("{}", displayable(plan.as_ref()).indent(false));
575575
let plan2_formatted =
576576
format!("{}", displayable(plan2.as_ref()).indent(false));

benchmarks/src/tpch/run.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -424,7 +424,7 @@ mod tests {
424424
let plan = ctx.sql(&query).await?;
425425
let plan = plan.create_physical_plan().await?;
426426
let bytes = physical_plan_to_bytes(plan.clone())?;
427-
let plan2 = physical_plan_from_bytes(&bytes, &ctx)?;
427+
let plan2 = physical_plan_from_bytes(&bytes, &ctx.task_ctx())?;
428428
let plan_formatted = format!("{}", displayable(plan.as_ref()).indent(false));
429429
let plan2_formatted =
430430
format!("{}", displayable(plan2.as_ref()).indent(false));

datafusion-examples/examples/composed_extension_codec.rs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,11 @@
3232
3333
use std::any::Any;
3434
use std::fmt::Debug;
35-
use std::ops::Deref;
3635
use std::sync::Arc;
3736

3837
use datafusion::common::internal_err;
3938
use datafusion::common::Result;
40-
use datafusion::logical_expr::registry::FunctionRegistry;
39+
use datafusion::execution::TaskContext;
4140
use datafusion::physical_plan::{DisplayAs, ExecutionPlan};
4241
use datafusion::prelude::SessionContext;
4342
use datafusion_proto::physical_plan::{
@@ -71,9 +70,8 @@ async fn main() {
7170
.expect("to proto");
7271

7372
// deserialize proto back to execution plan
74-
let runtime = ctx.runtime_env();
7573
let result_exec_plan: Arc<dyn ExecutionPlan> = proto
76-
.try_into_physical_plan(&ctx, runtime.deref(), &composed_codec)
74+
.try_into_physical_plan(&ctx.task_ctx(), &composed_codec)
7775
.expect("from proto");
7876

7977
// assert that the original and deserialized execution plans are equal
@@ -124,7 +122,7 @@ impl ExecutionPlan for ParentExec {
124122
fn execute(
125123
&self,
126124
_partition: usize,
127-
_context: Arc<datafusion::execution::TaskContext>,
125+
_context: Arc<TaskContext>,
128126
) -> Result<datafusion::physical_plan::SendableRecordBatchStream> {
129127
unreachable!()
130128
}
@@ -139,7 +137,7 @@ impl PhysicalExtensionCodec for ParentPhysicalExtensionCodec {
139137
&self,
140138
buf: &[u8],
141139
inputs: &[Arc<dyn ExecutionPlan>],
142-
_registry: &dyn FunctionRegistry,
140+
_ctx: &TaskContext,
143141
) -> Result<Arc<dyn ExecutionPlan>> {
144142
if buf == "ParentExec".as_bytes() {
145143
Ok(Arc::new(ParentExec {
@@ -200,7 +198,7 @@ impl ExecutionPlan for ChildExec {
200198
fn execute(
201199
&self,
202200
_partition: usize,
203-
_context: Arc<datafusion::execution::TaskContext>,
201+
_context: Arc<TaskContext>,
204202
) -> Result<datafusion::physical_plan::SendableRecordBatchStream> {
205203
unreachable!()
206204
}
@@ -215,7 +213,7 @@ impl PhysicalExtensionCodec for ChildPhysicalExtensionCodec {
215213
&self,
216214
buf: &[u8],
217215
_inputs: &[Arc<dyn ExecutionPlan>],
218-
_registry: &dyn FunctionRegistry,
216+
_ctx: &TaskContext,
219217
) -> Result<Arc<dyn ExecutionPlan>> {
220218
if buf == "ChildExec".as_bytes() {
221219
Ok(Arc::new(ChildExec {}))

datafusion/ffi/src/plan_properties.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,7 @@ impl TryFrom<FFI_PlanProperties> for PlanProperties {
181181

182182
// TODO Extend FFI to get the registry and codex
183183
let default_ctx = SessionContext::new();
184+
let task_context = default_ctx.task_ctx();
184185
let codex = DefaultPhysicalExtensionCodec {};
185186

186187
let ffi_orderings = unsafe { (ffi_props.output_ordering)(&ffi_props) };
@@ -190,7 +191,7 @@ impl TryFrom<FFI_PlanProperties> for PlanProperties {
190191
.map_err(|e| DataFusionError::External(Box::new(e)))?;
191192
let sort_exprs = parse_physical_sort_exprs(
192193
&proto_output_ordering.physical_sort_expr_nodes,
193-
&default_ctx,
194+
&task_context,
194195
&schema,
195196
&codex,
196197
)?;
@@ -202,7 +203,7 @@ impl TryFrom<FFI_PlanProperties> for PlanProperties {
202203
.map_err(|e| DataFusionError::External(Box::new(e)))?;
203204
let partitioning = parse_protobuf_partitioning(
204205
Some(&proto_output_partitioning),
205-
&default_ctx,
206+
&task_context,
206207
&schema,
207208
&codex,
208209
)?

datafusion/ffi/src/udaf/accumulator_args.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,16 +116,17 @@ impl TryFrom<FFI_AccumulatorArgs> for ForeignAccumulatorArgs {
116116
let schema = Schema::try_from(&value.schema.0)?;
117117

118118
let default_ctx = SessionContext::new();
119+
let task_ctx = default_ctx.task_ctx();
119120
let codex = DefaultPhysicalExtensionCodec {};
120121

121122
let order_bys = parse_physical_sort_exprs(
122123
&proto_def.ordering_req,
123-
&default_ctx,
124+
&task_ctx,
124125
&schema,
125126
&codex,
126127
)?;
127128

128-
let exprs = parse_physical_exprs(&proto_def.expr, &default_ctx, &schema, &codex)?;
129+
let exprs = parse_physical_exprs(&proto_def.expr, &task_ctx, &schema, &codex)?;
129130

130131
Ok(Self {
131132
return_field,

datafusion/ffi/src/udwf/partition_evaluator_args.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ impl TryFrom<FFI_PartitionEvaluatorArgs> for ForeignPartitionEvaluatorArgs {
148148
.map_err(|e| DataFusionError::Execution(e.to_string()))?
149149
.iter()
150150
.map(|expr_node| {
151-
parse_physical_expr(expr_node, &default_ctx, &schema, &codec)
151+
parse_physical_expr(expr_node, &default_ctx.task_ctx(), &schema, &codec)
152152
})
153153
.collect::<Result<Vec<_>>>()?;
154154

datafusion/proto/src/bytes/mod.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use crate::physical_plan::{
2424
AsExecutionPlan, DefaultPhysicalExtensionCodec, PhysicalExtensionCodec,
2525
};
2626
use crate::protobuf;
27+
use datafusion::execution::TaskContext;
2728
use datafusion_common::{plan_datafusion_err, Result};
2829
use datafusion_expr::{
2930
create_udaf, create_udf, create_udwf, AggregateUDF, Expr, LogicalPlan, Volatility,
@@ -316,13 +317,13 @@ pub fn physical_plan_from_json(
316317
let back: protobuf::PhysicalPlanNode = serde_json::from_str(json)
317318
.map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))?;
318319
let extension_codec = DefaultPhysicalExtensionCodec {};
319-
back.try_into_physical_plan(ctx, &ctx.runtime_env(), &extension_codec)
320+
back.try_into_physical_plan(&ctx.task_ctx(), &extension_codec)
320321
}
321322

322323
/// Deserialize a PhysicalPlan from bytes
323324
pub fn physical_plan_from_bytes(
324325
bytes: &[u8],
325-
ctx: &SessionContext,
326+
ctx: &TaskContext,
326327
) -> Result<Arc<dyn ExecutionPlan>> {
327328
let extension_codec = DefaultPhysicalExtensionCodec {};
328329
physical_plan_from_bytes_with_extension_codec(bytes, ctx, &extension_codec)
@@ -331,10 +332,10 @@ pub fn physical_plan_from_bytes(
331332
/// Deserialize a PhysicalPlan from bytes
332333
pub fn physical_plan_from_bytes_with_extension_codec(
333334
bytes: &[u8],
334-
ctx: &SessionContext,
335+
ctx: &TaskContext,
335336
extension_codec: &dyn PhysicalExtensionCodec,
336337
) -> Result<Arc<dyn ExecutionPlan>> {
337338
let protobuf = protobuf::PhysicalPlanNode::decode(bytes)
338339
.map_err(|e| plan_datafusion_err!("Error decoding expr as protobuf: {e}"))?;
339-
protobuf.try_into_physical_plan(ctx, &ctx.runtime_env(), extension_codec)
340+
protobuf.try_into_physical_plan(ctx, extension_codec)
340341
}

datafusion/proto/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@
115115
//! let bytes = physical_plan_to_bytes(physical_plan.clone())?;
116116
//!
117117
//! // Decode bytes from somewhere (over network, etc.) back to ExecutionPlan
118-
//! let physical_round_trip = physical_plan_from_bytes(&bytes, &ctx)?;
118+
//! let physical_round_trip = physical_plan_from_bytes(&bytes, &ctx.task_ctx())?;
119119
//! assert_eq!(format!("{:?}", physical_plan), format!("{:?}", physical_round_trip));
120120
//! # Ok(())
121121
//! # }

datafusion/proto/src/physical_plan/from_proto.rs

Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ use datafusion::datasource::object_store::ObjectStoreUrl;
3838
use datafusion::datasource::physical_plan::{
3939
FileGroup, FileScanConfig, FileScanConfigBuilder, FileSinkConfig, FileSource,
4040
};
41-
use datafusion::execution::FunctionRegistry;
41+
use datafusion::execution::{FunctionRegistry, TaskContext};
4242
use datafusion::logical_expr::WindowFunctionDefinition;
4343
use datafusion::physical_expr::{LexOrdering, PhysicalSortExpr, ScalarFunctionExpr};
4444
use datafusion::physical_plan::expressions::{
@@ -47,8 +47,6 @@ use datafusion::physical_plan::expressions::{
4747
};
4848
use datafusion::physical_plan::windows::{create_window_expr, schema_add_window_field};
4949
use datafusion::physical_plan::{Partitioning, PhysicalExpr, WindowExpr};
50-
use datafusion::prelude::SessionContext;
51-
use datafusion_common::config::ConfigOptions;
5250
use datafusion_common::{not_impl_err, DataFusionError, Result};
5351
use datafusion_proto_common::common::proto_error;
5452

@@ -76,7 +74,7 @@ impl From<&protobuf::PhysicalColumn> for Column {
7674
/// * `codec` - An extension codec used to decode custom UDFs.
7775
pub fn parse_physical_sort_expr(
7876
proto: &protobuf::PhysicalSortExprNode,
79-
ctx: &SessionContext,
77+
ctx: &TaskContext,
8078
input_schema: &Schema,
8179
codec: &dyn PhysicalExtensionCodec,
8280
) -> Result<PhysicalSortExpr> {
@@ -103,7 +101,7 @@ pub fn parse_physical_sort_expr(
103101
/// * `codec` - An extension codec used to decode custom UDFs.
104102
pub fn parse_physical_sort_exprs(
105103
proto: &[protobuf::PhysicalSortExprNode],
106-
ctx: &SessionContext,
104+
ctx: &TaskContext,
107105
input_schema: &Schema,
108106
codec: &dyn PhysicalExtensionCodec,
109107
) -> Result<Vec<PhysicalSortExpr>> {
@@ -125,7 +123,7 @@ pub fn parse_physical_sort_exprs(
125123
/// * `codec` - An extension codec used to decode custom UDFs.
126124
pub fn parse_physical_window_expr(
127125
proto: &protobuf::PhysicalWindowExprNode,
128-
ctx: &SessionContext,
126+
ctx: &TaskContext,
129127
input_schema: &Schema,
130128
codec: &dyn PhysicalExtensionCodec,
131129
) -> Result<Arc<dyn WindowExpr>> {
@@ -186,7 +184,7 @@ pub fn parse_physical_window_expr(
186184

187185
pub fn parse_physical_exprs<'a, I>(
188186
protos: I,
189-
ctx: &SessionContext,
187+
ctx: &TaskContext,
190188
input_schema: &Schema,
191189
codec: &dyn PhysicalExtensionCodec,
192190
) -> Result<Vec<Arc<dyn PhysicalExpr>>>
@@ -210,7 +208,7 @@ where
210208
/// * `codec` - An extension codec used to decode custom UDFs.
211209
pub fn parse_physical_expr(
212210
proto: &protobuf::PhysicalExprNode,
213-
ctx: &SessionContext,
211+
ctx: &TaskContext,
214212
input_schema: &Schema,
215213
codec: &dyn PhysicalExtensionCodec,
216214
) -> Result<Arc<dyn PhysicalExpr>> {
@@ -364,11 +362,8 @@ pub fn parse_physical_expr(
364362
let scalar_fun_def = Arc::clone(&udf);
365363

366364
let args = parse_physical_exprs(&e.args, ctx, input_schema, codec)?;
367-
let config_options =
368-
match ctx.state().execution_props().config_options.as_ref() {
369-
Some(config_options) => Arc::clone(config_options),
370-
None => Arc::new(ConfigOptions::default()),
371-
};
365+
366+
let config_options = Arc::clone(ctx.session_config().options());
372367

373368
Arc::new(
374369
ScalarFunctionExpr::new(
@@ -419,7 +414,7 @@ pub fn parse_physical_expr(
419414

420415
fn parse_required_physical_expr(
421416
expr: Option<&protobuf::PhysicalExprNode>,
422-
ctx: &SessionContext,
417+
ctx: &TaskContext,
423418
field: &str,
424419
input_schema: &Schema,
425420
codec: &dyn PhysicalExtensionCodec,
@@ -433,7 +428,7 @@ fn parse_required_physical_expr(
433428

434429
pub fn parse_protobuf_hash_partitioning(
435430
partitioning: Option<&protobuf::PhysicalHashRepartition>,
436-
ctx: &SessionContext,
431+
ctx: &TaskContext,
437432
input_schema: &Schema,
438433
codec: &dyn PhysicalExtensionCodec,
439434
) -> Result<Option<Partitioning>> {
@@ -453,7 +448,7 @@ pub fn parse_protobuf_hash_partitioning(
453448

454449
pub fn parse_protobuf_partitioning(
455450
partitioning: Option<&protobuf::Partitioning>,
456-
ctx: &SessionContext,
451+
ctx: &TaskContext,
457452
input_schema: &Schema,
458453
codec: &dyn PhysicalExtensionCodec,
459454
) -> Result<Option<Partitioning>> {
@@ -491,7 +486,7 @@ pub fn parse_protobuf_file_scan_schema(
491486

492487
pub fn parse_protobuf_file_scan_config(
493488
proto: &protobuf::FileScanExecConf,
494-
ctx: &SessionContext,
489+
ctx: &TaskContext,
495490
codec: &dyn PhysicalExtensionCodec,
496491
file_source: Arc<dyn FileSource>,
497492
) -> Result<FileScanConfig> {

0 commit comments

Comments
 (0)