Skip to content

Add support for correlated subquery #6140

@mustafasrepo

Description

@mustafasrepo

Is your feature request related to a problem or challenge?

When I run the query below

SELECT s.amount * (
         SELECT e.amount
         FROM sales_us AS e
         WHERE s.currency = e.currency AND
  		         s.ts >= e.ts
         ORDER BY e.ts DESC
         LIMIT 1
       ) AS amount_usd
FROM sales_global AS s
ORDER BY s.sn;

Datafusion returns

Error: NotImplemented("Physical plan does not support logical expression (<subquery>)")
Same query successfully runs on Postgre.

Describe the solution you'd like

I would like to have this feature.

Describe alternatives you've considered

No response

Additional context

To reproduce you can use test below

#[tokio::test]
async fn test_subquery() -> Result<()> {
    let config = SessionConfig::new()
        .with_target_partitions(1);
    let ctx = SessionContext::with_config(config);
    ctx.sql("CREATE TABLE sales_us (
          ts TIMESTAMP,
          currency VARCHAR(3),
          amount INT
        ) as VALUES
              ('2022-01-01 10:00:00'::timestamp, 'USD', 100.00),
              ('2022-01-01 11:00:00'::timestamp, 'USD', 200.00),
              ('2022-01-02 09:00:00'::timestamp, 'USD', 300.00),
              ('2022-01-02 10:00:00'::timestamp, 'USD', 150.00)").await?;
    ctx.sql("CREATE TABLE sales_global (
          ts TIMESTAMP,
          currency VARCHAR(3),
          amount INT
        ) as VALUES
          ('2022-01-01 08:00:00'::timestamp, 'EUR', 50.00),
          ('2022-01-01 11:30:00'::timestamp, 'EUR', 75.00),
          ('2022-01-02 12:00:00'::timestamp, 'EUR', 200.00),
          ('2022-01-03 10:00:00'::timestamp, 'EUR', 100.00)").await?;
    let sql = "SELECT s.amount * (
                 SELECT e.amount
                 FROM sales_us AS e
                 WHERE s.currency = e.currency AND
                         s.ts >= e.ts
                 ORDER BY e.ts DESC
                 LIMIT 1
               ) AS amount_usd
        FROM sales_global AS s
        ORDER BY s.sn
        ";

    let msg = format!("Creating logical plan for '{sql}'");
    let dataframe: DataFrame = ctx.sql(sql).await.expect(&msg);
    let physical_plan = dataframe.create_physical_plan().await?;
    let batches = collect(physical_plan, ctx.task_ctx()).await?;
    print_batches(&batches)?;
    Ok(())
}

Metadata

Metadata

Assignees

Labels

enhancementNew feature or request

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions