Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Can't serialize example ExecutionPlan to substrait #9299

Open
Tracked by #5173
alamb opened this issue Feb 21, 2024 · 4 comments
Open
Tracked by #5173

Can't serialize example ExecutionPlan to substrait #9299

alamb opened this issue Feb 21, 2024 · 4 comments
Labels
documentation Improvements or additions to documentation enhancement New feature or request substrait

Comments

@alamb
Copy link
Contributor

alamb commented Feb 21, 2024

Is your feature request related to a problem or challenge?

While working on an example for serializing substrait plans (see #9260 PR), I found I could not write an example for serializing an execution plan. The feature is still not complete enough

Describe the solution you'd like

What I would like:

I would like to add the example (or something like it) to datafusion/substrait/src/lib.rs and have
it work.

//! # Example: Serializing [`ExecutionPlan`]s
//!
//! This functionality is still under development and only works for a small subset of plans
//!
//! ```
//! # use datafusion::prelude::*;
//! # use std::collections::HashMap;
//! # use std::sync::Arc;
//! # use datafusion::error::Result;
//! # use datafusion::arrow::array::Int32Array;
//! # use datafusion::arrow::record_batch::RecordBatch;
//! # use datafusion_substrait::physical_plan;
//! # #[tokio::main(flavor = "current_thread")]
//! # async fn main() -> Result<()>{
//! // Create a plan that scans table 't'
//!  let ctx = SessionContext::new();
//!  let batch = RecordBatch::try_from_iter(vec![("x", Arc::new(Int32Array::from(vec![42])) as _)])?;
//!  ctx.register_batch("t", batch)?;
//!  let df = ctx.sql("SELECT x from t").await?;
//!  let physical_plan = df.create_physical_plan().await?;
//!
//!  // Convert the plan into a substrait (protobuf) Rel
//!  let mut extension_info= (vec![], HashMap::new());
//!  let substrait_plan = physical_plan::producer::to_substrait_rel(physical_plan.as_ref(), &mut extension_info)?;
//!
//!  // Decode bytes from somewhere (over network, etc.) back to ExecutionPlan
//!  let physical_round_trip = physical_plan::consumer::from_substrait_rel(
//!     &ctx, &substrait_plan, &HashMap::new()
//!  ).await?;
//!  assert_eq!(format!("{:?}", physical_plan), format!("{:?}", physical_round_trip));
//! # Ok(())
//! # }

When you run this test today you get an error about "mem provider not implemented" or something like that

Describe alternatives you've considered

No response

Additional context

I think making this work would be a matter of implementing serialization of MemTable / MemExec perhaps

@alamb alamb added enhancement New feature or request documentation Improvements or additions to documentation substrait labels Feb 21, 2024
@devinjdangelo
Copy link
Contributor

I am interested in taking a look at this. I was able to reproduce the error using the example above:

Error: Substrait("Unsupported plan in Substrait physical plan producer: MemoryExec: partitions=1, partition_sizes=[1]\n")

@devinjdangelo
Copy link
Contributor

devinjdangelo commented Feb 21, 2024

I looked into the example and the physical plan substrait producer/consumer code. Unfortunately for physical plans, the subtrait consumer and producer are only implemented for ParquetExec and even then they are not fully implemented, so I do not believe any practical example will execute without further development.

Here is an example which makes it further than the above but panics on the roundtrip assertion:

use datafusion::prelude::*;
use std::collections::HashMap;
use datafusion::error::Result;
use datafusion_substrait::physical_plan;
#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<()>{
    // Create a plan that scans table 't'
    let ctx = SessionContext::new();
    let testdata = datafusion::test_util::parquet_test_data();
    ctx.register_parquet(
        "alltypes_plain",
        &format!("{testdata}/alltypes_plain.parquet"),
        ParquetReadOptions::default(),
    )
    .await?;
    let df = ctx
        .sql(
            "SELECT * from alltypes_plain",
        )
        .await?;

    let physical_plan = df.create_physical_plan().await?;

    // Convert the plan into a substrait (protobuf) Rel
    let mut extension_info= (vec![], HashMap::new());
    let substrait_plan = physical_plan::producer::to_substrait_rel(physical_plan.as_ref(), &mut extension_info)?;

    // Decode bytes from somewhere (over network, etc.) back to ExecutionPlan
    let physical_round_trip = physical_plan::consumer::from_substrait_rel(
        &ctx, &substrait_plan, &HashMap::new()
    ).await?;
    assert_eq!(format!("{:?}", physical_plan), format!("{:?}", physical_round_trip));
    Ok(())
}

And here is the panic output:

thread 'main' panicked at datafusion/substrait/src/lib.rs:37:2:
assertion `left == right` failed
  left: "ParquetExec { pushdown_filters: None, reorder_filters: None, enable_page_index: None, enable_bloom_filter: None, base_config: object_store_url=ObjectStoreUrl { url: Url { scheme: \"file\", cannot_be_a_base: false, username: \"\", password: None, host: None, port: None, path: \"/\", query: None, fragment: None } }, statistics=Statistics { num_rows: Exact(8), total_byte_size: Exact(671), column_statistics: [ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }] }, file_groups={1 group: [[home/dev/arrow-datafusion/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], projected_statistics: Statistics { num_rows: Exact(8), total_byte_size: Absent, column_statistics: [ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }] }, projected_schema: Schema { fields: [Field { name: \"id\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"bool_col\", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"tinyint_col\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"smallint_col\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"int_col\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"bigint_col\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"float_col\", data_type: Float32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"double_col\", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"date_string_col\", data_type: Binary, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"string_col\", data_type: Binary, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"timestamp_col\", data_type: Timestamp(Nanosecond, None), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, projected_output_ordering: [], metrics: ExecutionPlanMetricsSet { inner: Mutex { data: MetricsSet { metrics: [Metric { value: Count { name: \"num_predicate_creation_errors\", count: Count { value: 0 } }, labels: [], partition: None }] } } }, predicate: None, pruning_predicate: None, page_pruning_predicate: None, metadata_size_hint: None, parquet_file_reader_factory: None }"
 right: "ParquetExec { pushdown_filters: None, reorder_filters: None, enable_page_index: None, enable_bloom_filter: None, base_config: object_store_url=ObjectStoreUrl { url: Url { scheme: \"file\", cannot_be_a_base: false, username: \"\", password: None, host: None, port: None, path: \"/\", query: None, fragment: None } }, statistics=Statistics { num_rows: Absent, total_byte_size: Absent, column_statistics: [] }, file_groups={1 group: [[home/dev/arrow-datafusion/parquet-testing/data/alltypes_plain.parquet]]}, projected_statistics: Statistics { num_rows: Absent, total_byte_size: Absent, column_statistics: [] }, projected_schema: Schema { fields: [], metadata: {} }, projected_output_ordering: [], metrics: ExecutionPlanMetricsSet { inner: Mutex { data: MetricsSet { metrics: [Metric { value: Count { name: \"num_predicate_creation_errors\", count: Count { value: 0 } }, labels: [], partition: None }] } } }, predicate: None, pruning_predicate: None, page_pruning_predicate: None, metadata_size_hint: None, parquet_file_reader_factory: None }"
stack backtrace:
...

You can see that the round trip lost many details about the ParquetExec such as projected_schema and projected_statistics.

I think if we want to include a user facing example of a physical plan substrait roundtrip, we will need to cut a ticket to complete the implementation of ParquetExec to substrait first.

It looks like #5176 built the initial framework for serializing physical plans, but it hasn't been picked up since then.

@alamb
Copy link
Contributor Author

alamb commented Feb 26, 2024

I think if we want to include a user facing example of a physical plan substrait roundtrip, we will need to cut a ticket to complete the implementation of ParquetExec to substrait first.

I filed #9347 to track this

Thank you for looking into this @devinjdangelo

I also made #5173 an epic and added this ticket and #9347 to it

@alamb
Copy link
Contributor Author

alamb commented Jun 9, 2024

I added to the substrait support epic: #5173

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
documentation Improvements or additions to documentation enhancement New feature or request substrait
Projects
None yet
Development

No branches or pull requests

2 participants