Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion-examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ cargo run --example dataframe
- [`examples/query_planning/analyzer_rule.rs`](examples/query_planning/analyzer_rule.rs): Use a custom AnalyzerRule to change a query's semantics (row level access control)
- [`examples/data_io/catalog.rs`](examples/data_io/catalog.rs): Register the table into a custom catalog
- [`examples/data_io/json_shredding.rs`](examples/data_io/json_shredding.rs): Shows how to implement custom filter rewriting for JSON shredding
- [`composed_extension_codec`](examples/composed_extension_codec.rs): Example of using multiple extension codecs for serialization / deserialization
- [`examples/proto/composed_extension_codec`](examples/proto/composed_extension_codec.rs): Example of using multiple extension codecs for serialization / deserialization
- [`examples/custom_data_source/csv_sql_streaming.rs`](examples/custom_data_source/csv_sql_streaming.rs): Build and run a streaming query plan from a SQL statement against a local CSV file
- [`examples/custom_data_source/csv_json_opener.rs`](examples/custom_data_source/csv_json_opener.rs): Use low level `FileOpener` APIs to read CSV/JSON into Arrow `RecordBatch`es
- [`examples/custom_data_source/custom_datasource.rs`](examples/custom_data_source/custom_datasource.rs): Run queries against a custom datasource (TableProvider)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.

//! See `main.rs` for how to run it.
//!
//! This example demonstrates how to compose multiple PhysicalExtensionCodecs
//!
//! This can be helpful when an Execution plan tree has different nodes from different crates
Expand Down Expand Up @@ -44,8 +46,8 @@ use datafusion_proto::physical_plan::{
};
use datafusion_proto::protobuf;

#[tokio::main]
async fn main() {
/// Example of using multiple extension codecs for serialization / deserialization
pub async fn composed_extension_codec() -> Result<()> {
// build execution plan that has both types of nodes
//
// Note each node requires a different `PhysicalExtensionCodec` to decode
Expand All @@ -66,16 +68,16 @@ async fn main() {
protobuf::PhysicalPlanNode::try_from_physical_plan(
exec_plan.clone(),
&composed_codec,
)
.expect("to proto");
)?;

// deserialize proto back to execution plan
let result_exec_plan: Arc<dyn ExecutionPlan> = proto
.try_into_physical_plan(&ctx.task_ctx(), &composed_codec)
.expect("from proto");
let result_exec_plan: Arc<dyn ExecutionPlan> =
proto.try_into_physical_plan(&ctx.task_ctx(), &composed_codec)?;

// assert that the original and deserialized execution plans are equal
assert_eq!(format!("{exec_plan:?}"), format!("{result_exec_plan:?}"));

Ok(())
}

/// This example has two types of nodes: `ParentExec` and `ChildExec` which can only
Expand Down
89 changes: 89 additions & 0 deletions datafusion-examples/examples/proto/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! # Examples demonstrating DataFusion's plan serialization via the `datafusion-proto` crate
//!
//! These examples show how to use multiple extension codecs for serialization / deserialization.
//!
//! ## Usage
//! ```bash
//! cargo run --example proto -- [composed_extension_codec]
//! ```
//!
//! Each subcommand runs a corresponding example:
//! - `composed_extension_codec` — example of using multiple extension codecs for serialization / deserialization

mod composed_extension_codec;

use std::str::FromStr;

use datafusion::error::{DataFusionError, Result};

enum ExampleKind {
ComposedExtensionCodec,
}

impl AsRef<str> for ExampleKind {
fn as_ref(&self) -> &str {
match self {
Self::ComposedExtensionCodec => "composed_extension_codec",
}
}
}

impl FromStr for ExampleKind {
type Err = DataFusionError;

fn from_str(s: &str) -> Result<Self> {
match s {
"composed_extension_codec" => Ok(Self::ComposedExtensionCodec),
_ => Err(DataFusionError::Execution(format!("Unknown example: {s}"))),
}
}
}

impl ExampleKind {
const ALL: [Self; 1] = [Self::ComposedExtensionCodec];

const EXAMPLE_NAME: &str = "proto";

fn variants() -> Vec<&'static str> {
Self::ALL.iter().map(|x| x.as_ref()).collect()
}
}

#[tokio::main]
async fn main() -> Result<()> {
let usage = format!(
"Usage: cargo run --example {} -- [{}]",
ExampleKind::EXAMPLE_NAME,
ExampleKind::variants().join("|")
);

let arg = std::env::args().nth(1).ok_or_else(|| {
eprintln!("{usage}");
DataFusionError::Execution("Missing argument".to_string())
})?;

match arg.parse::<ExampleKind>()? {
ExampleKind::ComposedExtensionCodec => {
composed_extension_codec::composed_extension_codec().await?
}
}

Ok(())
}