Skip to content

Commit b6be176

Browse files
fix: Add partial support for recursive queries instrumentation
This change introduces partial support for tracing and metrics instrumentation of recursive queries in DataFusion. Recursive queries now spawn a new span group for each recursive call, as the recursive implementation recreates the recursive nodes for each loop iteration. This is enabled by supporting the `with_new_inner` method on `InstrumentedExec`, which allows the optimizer to rebuild instrumented plans as needed. However, due to current limitations in DataFusion, nodes of type `WorkTableExec` are not instrumented, as wrapping them would break recursive query execution. This limitation will be revisited once upstream changes allow `WorkTableExec` to be safely instrumented. See also: apache/datafusion#16469 and related discussions.
1 parent d6d29ec commit b6be176

28 files changed

+1877
-48
lines changed

datafusion-tracing/src/instrument_rule.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use crate::instrumented::SpanCreateFn;
2222
use crate::options::InstrumentationOptions;
2323
use datafusion::common::runtime::{set_join_set_tracer, JoinSetTracer};
2424
use datafusion::common::tree_node::{Transformed, TransformedResult, TreeNode};
25+
use datafusion::physical_plan::work_table::WorkTableExec;
2526
use datafusion::{
2627
config::ConfigOptions, physical_optimizer::PhysicalOptimizerRule,
2728
physical_plan::ExecutionPlan,
@@ -69,8 +70,15 @@ impl PhysicalOptimizerRule for InstrumentRule {
6970
_config: &ConfigOptions,
7071
) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
7172
// Iterate over the plan and wrap each node with InstrumentedExec
73+
//
74+
// NOTE: Recursive queries dynamically rebuild the execution plan during execution and assume one of the nodes is a `WorkTableExec``.
75+
// Wrapping it with `InstrumentedExec`` would break the recursive query, so for now we only isntrument non-`WorkTableExec` nodes.
76+
//
77+
// TODO: Remove this limitation once `with_work_table` is made a trait method of `ExecutionPlan` (tracked by https://github.com/apache/datafusion/pull/16469)
7278
plan.transform(|plan| {
73-
if plan.as_any().downcast_ref::<InstrumentedExec>().is_none() {
79+
if plan.as_any().downcast_ref::<InstrumentedExec>().is_none()
80+
&& plan.as_any().downcast_ref::<WorkTableExec>().is_none()
81+
{
7482
// Node is not InstrumentedExec; wrap it
7583
Ok(Transformed::yes(Arc::new(InstrumentedExec::new(
7684
plan,

datafusion-tracing/src/instrumented.rs

Lines changed: 67 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,8 @@ use datafusion::{
4545
use delegate::delegate;
4646
use std::{
4747
any::Any,
48-
fmt,
49-
fmt::Debug,
48+
collections::HashMap,
49+
fmt::{self, Debug},
5050
sync::{Arc, OnceLock},
5151
};
5252
use tracing::{field, Span};
@@ -105,6 +105,23 @@ impl InstrumentedExec {
105105
}
106106
}
107107

108+
/// Creates a new `InstrumentedExec` with the same configuration as this instance but with a different inner execution plan.
109+
///
110+
/// This method is used when the optimizer needs to replace the inner execution plan while preserving
111+
/// all the instrumentation settings (metrics recording, preview limits, span creation function, etc.).
112+
fn with_new_inner(&self, inner: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
113+
Arc::new(InstrumentedExec::new(
114+
inner,
115+
self.span_create_fn.clone(),
116+
&InstrumentationOptions {
117+
record_metrics: self.record_metrics,
118+
preview_limit: self.preview_limit,
119+
preview_fn: self.preview_fn.clone(),
120+
custom_fields: HashMap::new(),
121+
},
122+
))
123+
}
124+
108125
/// Retrieves the tracing span, initializing it if necessary.
109126
fn get_span(&self) -> Span {
110127
self.span
@@ -191,11 +208,6 @@ impl InstrumentedExec {
191208

192209
impl ExecutionPlan for InstrumentedExec {
193210
// Delegate all ExecutionPlan methods to the inner plan, except for `as_any` and `execute`.
194-
//
195-
// Note: Methods returning a new ExecutionPlan instance delegate directly to the inner plan,
196-
// resulting in loss of instrumentation. This is intentional since instrumentation should
197-
// be applied by the final PhysicalOptimizerRule pass. Therefore, any resulting plans will
198-
// be re-instrumented after optimizer transformations.
199211
delegate! {
200212
to self.inner {
201213
fn name(&self) -> &str;
@@ -207,25 +219,15 @@ impl ExecutionPlan for InstrumentedExec {
207219
fn maintains_input_order(&self) -> Vec<bool>;
208220
fn benefits_from_input_partitioning(&self) -> Vec<bool>;
209221
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>>;
210-
fn repartitioned(
211-
&self,
212-
target_partitions: usize,
213-
config: &ConfigOptions,
214-
) -> Result<Option<Arc<dyn ExecutionPlan>>>;
215222
fn metrics(&self) -> Option<MetricsSet>;
216223
// We need to delegate to the inner plan's statistics method to preserve behavior,
217224
// even though it's deprecated in favor of partition_statistics
218225
#[allow(deprecated)]
219226
fn statistics(&self) -> Result<Statistics>;
220227
fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics>;
221228
fn supports_limit_pushdown(&self) -> bool;
222-
fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>>;
223229
fn fetch(&self) -> Option<usize>;
224230
fn cardinality_effect(&self) -> CardinalityEffect;
225-
fn try_swapping_with_projection(
226-
&self,
227-
projection: &ProjectionExec,
228-
) -> Result<Option<Arc<dyn ExecutionPlan>>>;
229231
fn gather_filters_for_pushdown(
230232
&self,
231233
parent_filters: Vec<Arc<dyn PhysicalExpr>>,
@@ -239,15 +241,57 @@ impl ExecutionPlan for InstrumentedExec {
239241
}
240242
}
241243

242-
delegate! {
243-
to self.inner.clone() {
244-
fn with_new_children(
245-
self: Arc<Self>,
246-
children: Vec<Arc<dyn ExecutionPlan>>,
247-
) -> Result<Arc<dyn ExecutionPlan>>;
244+
/// Delegate to the inner plan for repartitioning and rewrap with an InstrumentedExec.
245+
fn repartitioned(
246+
&self,
247+
target_partitions: usize,
248+
config: &ConfigOptions,
249+
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
250+
if let Some(new_inner) = self
251+
.inner
252+
.clone()
253+
.repartitioned(target_partitions, config)?
254+
{
255+
Ok(Some(self.with_new_inner(new_inner)))
256+
} else {
257+
Ok(None)
248258
}
249259
}
250260

261+
/// Delegate to the inner plan for fetching and rewrap with an InstrumentedExec.
262+
fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
263+
if let Some(new_inner) = self.inner.clone().with_fetch(limit) {
264+
Some(self.with_new_inner(new_inner))
265+
} else {
266+
None
267+
}
268+
}
269+
270+
/// Delegate to the inner plan for swapping with a projection and rewrap with an InstrumentedExec.
271+
fn try_swapping_with_projection(
272+
&self,
273+
projection: &ProjectionExec,
274+
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
275+
if let Some(new_inner) = self
276+
.inner
277+
.clone()
278+
.try_swapping_with_projection(projection)?
279+
{
280+
Ok(Some(self.with_new_inner(new_inner)))
281+
} else {
282+
Ok(None)
283+
}
284+
}
285+
286+
/// Delegate to the inner plan for creating new children and rewrap with an InstrumentedExec.
287+
fn with_new_children(
288+
self: Arc<Self>,
289+
children: Vec<Arc<dyn ExecutionPlan>>,
290+
) -> Result<Arc<dyn ExecutionPlan>> {
291+
let new_inner = self.inner.clone().with_new_children(children)?;
292+
Ok(self.with_new_inner(new_inner))
293+
}
294+
251295
fn as_any(&self) -> &dyn Any {
252296
self
253297
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
WITH RECURSIVE numbers(n) AS (
2+
SELECT 1 AS n
3+
UNION ALL
4+
SELECT n + 1 FROM numbers WHERE n < 3
5+
) SELECT n FROM numbers

tests/integration_tests.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,23 @@ async fn test_scrabble_all_options() -> Result<()> {
165165
.await
166166
}
167167

168+
#[tokio::test(flavor = "multi_thread", worker_threads = 8)]
169+
async fn test_recursive() -> Result<()> {
170+
execute_test_case("08_recursive", &QueryTestCase::new("recursive")).await
171+
}
172+
173+
#[tokio::test(flavor = "multi_thread", worker_threads = 8)]
174+
async fn test_recursive_all_options() -> Result<()> {
175+
execute_test_case(
176+
"09_recursive_all_options",
177+
&QueryTestCase::new("recursive")
178+
.with_metrics_collection()
179+
.with_row_limit(5)
180+
.with_compact_preview(),
181+
)
182+
.await
183+
}
184+
168185
/// Executes the provided [`QueryTestCase`], setting up tracing and verifying
169186
/// log output according to its parameters.
170187
async fn execute_test_case(test_name: &str, test_case: &QueryTestCase<'_>) -> Result<()> {

tests/snapshots/03_basic_preview_00_PlaceholderRowExec.snap

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
---
2-
source: integration/tests/integration_tests.rs
2+
source: tests/integration_tests.rs
33
expression: preview
44
---
55
++

tests/snapshots/03_basic_preview_00_ProjectionExec.snap

Lines changed: 0 additions & 9 deletions
This file was deleted.

tests/snapshots/03_basic_preview_01_ProjectionExec.snap

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
---
2-
source: integration/tests/integration_tests.rs
2+
source: tests/integration_tests.rs
33
expression: preview
44
---
55
+----------+

tests/snapshots/04_basic_compact_preview_00_PlaceholderRowExec.snap

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
---
2-
source: integration/tests/integration_tests.rs
2+
source: tests/integration_tests.rs
33
expression: preview
44
---
55
++

tests/snapshots/04_basic_compact_preview_01_ProjectionExec.snap

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
---
2-
source: integration/tests/integration_tests.rs
2+
source: tests/integration_tests.rs
33
expression: preview
44
---
55
+----------+

tests/snapshots/05_basic_all_options_00_PlaceholderRowExec.snap

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
---
2-
source: integration/tests/integration_tests.rs
2+
source: tests/integration_tests.rs
33
expression: preview
44
---
55
++

0 commit comments

Comments
 (0)