Skip to content

Commit 5d0b88e

Browse files
fix: Add partial support for recursive queries instrumentation
This change introduces partial support for tracing and metrics instrumentation of recursive queries in DataFusion. Recursive queries now spawn a new span group for each recursive call, as the recursive implementation recreates the recursive nodes for each loop iteration. This is enabled by supporting the `with_new_inner` method on `InstrumentedExec`, which allows the optimizer to rebuild instrumented plans as needed. However, due to current limitations in DataFusion, nodes of type `WorkTableExec` are not instrumented, as wrapping them would break recursive query execution. This limitation will be revisited once upstream changes allow `WorkTableExec` to be safely instrumented. See also: apache/datafusion#16469 and related discussions.
1 parent d6d29ec commit 5d0b88e

30 files changed

+1893
-48
lines changed

README.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,14 @@ async fn main() -> Result<()> {
119119

120120
A more complete example can be found in the [examples directory](https://github.com/datafusion-contrib/datafusion-tracing/tree/main/examples).
121121

122+
## Limitations
123+
124+
### Recursive Queries
125+
126+
When using DataFusion Tracing with recursive queries (e.g., those using `WITH RECURSIVE`), nodes of type `WorkTableExec` are intentionally not instrumented. This is due to a current limitation in DataFusion: instrumenting these nodes can break recursive query execution and result in errors such as `Unexpected empty work table.`
127+
128+
As a result, while most of the recursive query plan will be traced and metrics will be collected, some internal operations related to recursion will not be visible in the trace until upstream support is available. See [issue #5](https://github.com/datafusion-contrib/datafusion-tracing/issues/5) for details and tracking progress on this limitation.
129+
122130
<!-- cargo-rdme end -->
123131

124132
## Setting Up a Collector

datafusion-tracing/src/instrument_rule.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use crate::instrumented::SpanCreateFn;
2222
use crate::options::InstrumentationOptions;
2323
use datafusion::common::runtime::{set_join_set_tracer, JoinSetTracer};
2424
use datafusion::common::tree_node::{Transformed, TransformedResult, TreeNode};
25+
use datafusion::physical_plan::work_table::WorkTableExec;
2526
use datafusion::{
2627
config::ConfigOptions, physical_optimizer::PhysicalOptimizerRule,
2728
physical_plan::ExecutionPlan,
@@ -69,8 +70,15 @@ impl PhysicalOptimizerRule for InstrumentRule {
6970
_config: &ConfigOptions,
7071
) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
7172
// Iterate over the plan and wrap each node with InstrumentedExec
73+
//
74+
// NOTE: Recursive queries dynamically rebuild the execution plan during execution and assume one of the nodes is a `WorkTableExec`.
75+
// Wrapping it with `InstrumentedExec`` would break the recursive query, so for now we only instrument non-`WorkTableExec` nodes.
76+
//
77+
// TODO: Remove this limitation once `with_work_table` is made a trait method of `ExecutionPlan` (tracked by https://github.com/apache/datafusion/pull/16469)
7278
plan.transform(|plan| {
73-
if plan.as_any().downcast_ref::<InstrumentedExec>().is_none() {
79+
if plan.as_any().downcast_ref::<InstrumentedExec>().is_none()
80+
&& plan.as_any().downcast_ref::<WorkTableExec>().is_none()
81+
{
7482
// Node is not InstrumentedExec; wrap it
7583
Ok(Transformed::yes(Arc::new(InstrumentedExec::new(
7684
plan,

datafusion-tracing/src/instrumented.rs

Lines changed: 67 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,8 @@ use datafusion::{
4545
use delegate::delegate;
4646
use std::{
4747
any::Any,
48-
fmt,
49-
fmt::Debug,
48+
collections::HashMap,
49+
fmt::{self, Debug},
5050
sync::{Arc, OnceLock},
5151
};
5252
use tracing::{field, Span};
@@ -105,6 +105,23 @@ impl InstrumentedExec {
105105
}
106106
}
107107

108+
/// Creates a new `InstrumentedExec` with the same configuration as this instance but with a different inner execution plan.
109+
///
110+
/// This method is used when the optimizer needs to replace the inner execution plan while preserving
111+
/// all the instrumentation settings (metrics recording, preview limits, span creation function, etc.).
112+
fn with_new_inner(&self, inner: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
113+
Arc::new(InstrumentedExec::new(
114+
inner,
115+
self.span_create_fn.clone(),
116+
&InstrumentationOptions {
117+
record_metrics: self.record_metrics,
118+
preview_limit: self.preview_limit,
119+
preview_fn: self.preview_fn.clone(),
120+
custom_fields: HashMap::new(), // custom fields are not used by `InstrumentedExec`, only by the higher-level `instrument_with_spans` macro family
121+
},
122+
))
123+
}
124+
108125
/// Retrieves the tracing span, initializing it if necessary.
109126
fn get_span(&self) -> Span {
110127
self.span
@@ -191,11 +208,6 @@ impl InstrumentedExec {
191208

192209
impl ExecutionPlan for InstrumentedExec {
193210
// Delegate all ExecutionPlan methods to the inner plan, except for `as_any` and `execute`.
194-
//
195-
// Note: Methods returning a new ExecutionPlan instance delegate directly to the inner plan,
196-
// resulting in loss of instrumentation. This is intentional since instrumentation should
197-
// be applied by the final PhysicalOptimizerRule pass. Therefore, any resulting plans will
198-
// be re-instrumented after optimizer transformations.
199211
delegate! {
200212
to self.inner {
201213
fn name(&self) -> &str;
@@ -207,25 +219,15 @@ impl ExecutionPlan for InstrumentedExec {
207219
fn maintains_input_order(&self) -> Vec<bool>;
208220
fn benefits_from_input_partitioning(&self) -> Vec<bool>;
209221
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>>;
210-
fn repartitioned(
211-
&self,
212-
target_partitions: usize,
213-
config: &ConfigOptions,
214-
) -> Result<Option<Arc<dyn ExecutionPlan>>>;
215222
fn metrics(&self) -> Option<MetricsSet>;
216223
// We need to delegate to the inner plan's statistics method to preserve behavior,
217224
// even though it's deprecated in favor of partition_statistics
218225
#[allow(deprecated)]
219226
fn statistics(&self) -> Result<Statistics>;
220227
fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics>;
221228
fn supports_limit_pushdown(&self) -> bool;
222-
fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>>;
223229
fn fetch(&self) -> Option<usize>;
224230
fn cardinality_effect(&self) -> CardinalityEffect;
225-
fn try_swapping_with_projection(
226-
&self,
227-
projection: &ProjectionExec,
228-
) -> Result<Option<Arc<dyn ExecutionPlan>>>;
229231
fn gather_filters_for_pushdown(
230232
&self,
231233
parent_filters: Vec<Arc<dyn PhysicalExpr>>,
@@ -239,15 +241,57 @@ impl ExecutionPlan for InstrumentedExec {
239241
}
240242
}
241243

242-
delegate! {
243-
to self.inner.clone() {
244-
fn with_new_children(
245-
self: Arc<Self>,
246-
children: Vec<Arc<dyn ExecutionPlan>>,
247-
) -> Result<Arc<dyn ExecutionPlan>>;
244+
/// Delegate to the inner plan for repartitioning and rewrap with an InstrumentedExec.
245+
fn repartitioned(
246+
&self,
247+
target_partitions: usize,
248+
config: &ConfigOptions,
249+
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
250+
if let Some(new_inner) = self
251+
.inner
252+
.clone()
253+
.repartitioned(target_partitions, config)?
254+
{
255+
Ok(Some(self.with_new_inner(new_inner)))
256+
} else {
257+
Ok(None)
248258
}
249259
}
250260

261+
/// Delegate to the inner plan for fetching and rewrap with an InstrumentedExec.
262+
fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
263+
if let Some(new_inner) = self.inner.clone().with_fetch(limit) {
264+
Some(self.with_new_inner(new_inner))
265+
} else {
266+
None
267+
}
268+
}
269+
270+
/// Delegate to the inner plan for swapping with a projection and rewrap with an InstrumentedExec.
271+
fn try_swapping_with_projection(
272+
&self,
273+
projection: &ProjectionExec,
274+
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
275+
if let Some(new_inner) = self
276+
.inner
277+
.clone()
278+
.try_swapping_with_projection(projection)?
279+
{
280+
Ok(Some(self.with_new_inner(new_inner)))
281+
} else {
282+
Ok(None)
283+
}
284+
}
285+
286+
/// Delegate to the inner plan for creating new children and rewrap with an InstrumentedExec.
287+
fn with_new_children(
288+
self: Arc<Self>,
289+
children: Vec<Arc<dyn ExecutionPlan>>,
290+
) -> Result<Arc<dyn ExecutionPlan>> {
291+
let new_inner = self.inner.clone().with_new_children(children)?;
292+
Ok(self.with_new_inner(new_inner))
293+
}
294+
251295
fn as_any(&self) -> &dyn Any {
252296
self
253297
}

datafusion-tracing/src/lib.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,14 @@
109109
//!
110110
//! A more complete example can be found in the [examples directory](https://github.com/datafusion-contrib/datafusion-tracing/tree/main/examples).
111111
//!
112+
//! # Limitations
113+
//!
114+
//! ## Recursive Queries
115+
//!
116+
//! When using DataFusion Tracing with recursive queries (e.g., those using `WITH RECURSIVE`), nodes of type `WorkTableExec` are intentionally not instrumented. This is due to a current limitation in DataFusion: instrumenting these nodes can break recursive query execution and result in errors such as `Unexpected empty work table.`
117+
//!
118+
//! As a result, while most of the recursive query plan will be traced and metrics will be collected, some internal operations related to recursion will not be visible in the trace until upstream support is available. See [issue #5](https://github.com/datafusion-contrib/datafusion-tracing/issues/5) for details and tracking progress on this limitation.
119+
//!
112120
113121
mod instrument_rule;
114122
mod instrumented;
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
WITH RECURSIVE numbers(n) AS (
2+
SELECT 1 AS n
3+
UNION ALL
4+
SELECT n + 1 FROM numbers WHERE n < 3
5+
) SELECT n FROM numbers

tests/integration_tests.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,23 @@ async fn test_scrabble_all_options() -> Result<()> {
165165
.await
166166
}
167167

168+
#[tokio::test(flavor = "multi_thread", worker_threads = 8)]
169+
async fn test_recursive() -> Result<()> {
170+
execute_test_case("08_recursive", &QueryTestCase::new("recursive")).await
171+
}
172+
173+
#[tokio::test(flavor = "multi_thread", worker_threads = 8)]
174+
async fn test_recursive_all_options() -> Result<()> {
175+
execute_test_case(
176+
"09_recursive_all_options",
177+
&QueryTestCase::new("recursive")
178+
.with_metrics_collection()
179+
.with_row_limit(5)
180+
.with_compact_preview(),
181+
)
182+
.await
183+
}
184+
168185
/// Executes the provided [`QueryTestCase`], setting up tracing and verifying
169186
/// log output according to its parameters.
170187
async fn execute_test_case(test_name: &str, test_case: &QueryTestCase<'_>) -> Result<()> {

tests/snapshots/03_basic_preview_00_PlaceholderRowExec.snap

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
---
2-
source: integration/tests/integration_tests.rs
2+
source: tests/integration_tests.rs
33
expression: preview
44
---
55
++

tests/snapshots/03_basic_preview_00_ProjectionExec.snap

Lines changed: 0 additions & 9 deletions
This file was deleted.

tests/snapshots/03_basic_preview_01_ProjectionExec.snap

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
---
2-
source: integration/tests/integration_tests.rs
2+
source: tests/integration_tests.rs
33
expression: preview
44
---
55
+----------+

tests/snapshots/04_basic_compact_preview_00_PlaceholderRowExec.snap

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
---
2-
source: integration/tests/integration_tests.rs
2+
source: tests/integration_tests.rs
33
expression: preview
44
---
55
++

0 commit comments

Comments
 (0)