Skip to content

Recursive queries fail with Unexpected empty work table. with instrumentation #5

Open
@debajyoti-truefoundry

Description

@debajyoti-truefoundry

Describe the bug

Recursive queries fail with Unexpected empty work table with instrumentation.

To Reproduce

use datafusion::{
    arrow::{array::RecordBatch, util::pretty::pretty_format_batches},
    error::Result,
    execution::SessionStateBuilder,
    prelude::*,
};
use datafusion_tracing::{
    InstrumentationOptions, instrument_with_info_spans, pretty_format_compact_batch,
};
use std::sync::Arc;
use tracing::field;

#[tokio::main]
async fn main() -> Result<()> {
    let options = InstrumentationOptions::builder()
        .record_metrics(true)
        .preview_limit(5)
        .preview_fn(Arc::new(|batch: &RecordBatch| {
            pretty_format_compact_batch(batch, 64, 3, 10).map(|fmt| fmt.to_string())
        }))
        .add_custom_field("env", "production")
        .add_custom_field("region", "us-west")
        .build();

    let instrument_rule = instrument_with_info_spans!(
        options: options,
        env = field::Empty,
        region = field::Empty,
    );

    let session_state = SessionStateBuilder::new()
        .with_default_features()
        .with_physical_optimizer_rule(instrument_rule)
        .build();

    let ctx = SessionContext::new_with_state(session_state);

    println!("Running recursive CTE query...");
    let recursive_sql = "WITH RECURSIVE numbers(n) AS (\
        SELECT 1 AS n \
        UNION ALL \
        SELECT n + 1 FROM numbers WHERE n < 10 \
    ) SELECT n FROM numbers";

    match ctx.sql(recursive_sql).await?.collect().await {
        Ok(recursive_results) => {
            println!(
                "Recursive Query Results (1..10):\n{}",
                pretty_format_batches(recursive_results.as_slice())?
            );
        }
        Err(e) => {
            println!("Recursive CTE failed with error: {}", e);
            println!("This demonstrates the instrumentation issue!");
        }
    }

    println!("\nTesting without instrumentation...");
    let clean_session_state = SessionStateBuilder::new().with_default_features().build();

    let clean_ctx = SessionContext::new_with_state(clean_session_state);

    match clean_ctx.sql(recursive_sql).await?.collect().await {
        Ok(clean_results) => {
            println!(
                "Clean Recursive Query Results (1..10):\n{}",
                pretty_format_batches(clean_results.as_slice())?
            );
            println!("Without instrumentation, recursive CTE works!");
        }
        Err(e) => {
            println!("Even clean context failed: {}", e);
        }
    }

    Ok(())
}

Expected behavior

No response

Additional context

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions