Skip to content

Implement LogicalPlan serde in datafusion-proto #2639

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
May 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions datafusion/proto/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,16 @@ path = "src/lib.rs"

[dependencies]
arrow = { version = "14.0.0" }
async-trait = "0.1"
datafusion = { path = "../core", version = "8.0.0" }
datafusion-common = { path = "../common", version = "8.0.0" }
datafusion-data-access = { path = "../data-access", version = "8.0.0" }
datafusion-expr = { path = "../expr", version = "8.0.0" }
prost = "0.10"
tokio = "1.18"

[dev-dependencies]
doc-comment = "0.3"

[build-dependencies]
tonic-build = { version = "0.7" }
46 changes: 46 additions & 0 deletions datafusion/proto/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,50 @@

This crate is a submodule of DataFusion that provides a protocol buffer format for representing query plans and expressions.

## Serializing Expressions

Based on [examples/expr_serde.rs](examples/expr_serde.rs)

```rust
use datafusion_common::Result;
use datafusion_expr::{col, lit, Expr};
use datafusion_proto::bytes::Serializeable;

fn main() -> Result<()> {
// Create a new `Expr` a < 32
let expr = col("a").lt(lit(5i32));

// Convert it to an opaque form
let bytes = expr.to_bytes()?;

// Decode bytes from somewhere (over network, etc.)
let decoded_expr = Expr::from_bytes(&bytes)?;
assert_eq!(expr, decoded_expr);
Ok(())
}
```

## Serializing Plans

Based on [examples/plan_serde.rs](examples/plan_serde.rs)

```rust
use datafusion::prelude::*;
use datafusion_common::Result;
use datafusion_proto::bytes::{logical_plan_from_bytes, logical_plan_to_bytes};

#[tokio::main]
async fn main() -> Result<()> {
let ctx = SessionContext::new();
ctx.register_csv("t1", "testdata/test.csv", CsvReadOptions::default())
.await
?;
let plan = ctx.table("t1")?.to_logical_plan()?;
let bytes = logical_plan_to_bytes(&plan)?;
let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?;
assert_eq!(format!("{:?}", plan), format!("{:?}", logical_round_trip));
Ok(())
}
```

[df]: https://crates.io/crates/datafusion
33 changes: 33 additions & 0 deletions datafusion/proto/examples/expr_serde.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use datafusion_common::Result;
use datafusion_expr::{col, lit, Expr};
use datafusion_proto::bytes::Serializeable;

fn main() -> Result<()> {
// Create a new `Expr` a < 32
let expr = col("a").lt(lit(5i32));

// Convert it to an opaque form
let bytes = expr.to_bytes()?;

// Decode bytes from somewhere (over network, etc.)
let decoded_expr = Expr::from_bytes(&bytes)?;
assert_eq!(expr, decoded_expr);
Ok(())
}
32 changes: 32 additions & 0 deletions datafusion/proto/examples/plan_serde.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use datafusion::prelude::*;
use datafusion_common::Result;
use datafusion_proto::bytes::{logical_plan_from_bytes, logical_plan_to_bytes};

#[tokio::main]
async fn main() -> Result<()> {
let ctx = SessionContext::new();
ctx.register_csv("t1", "testdata/test.csv", CsvReadOptions::default())
.await?;
let plan = ctx.table("t1")?.to_logical_plan()?;
let bytes = logical_plan_to_bytes(&plan)?;
let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?;
assert_eq!(format!("{:?}", plan), format!("{:?}", logical_round_trip));
Ok(())
}
219 changes: 218 additions & 1 deletion datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ syntax = "proto3";
package datafusion;

option java_multiple_files = true;
option java_package = "org.datafusioncompute.protobuf";
option java_package = "org.apache.arrow.datafusion.protobuf";
option java_outer_classname = "DatafusionProto";

message ColumnRelation {
Expand All @@ -43,6 +43,223 @@ message DfSchema {
map<string, string> metadata = 2;
}

// logical plan
// LogicalPlan is a nested type
message LogicalPlanNode {
oneof LogicalPlanType {
ListingTableScanNode listing_scan = 1;
ProjectionNode projection = 3;
SelectionNode selection = 4;
LimitNode limit = 5;
AggregateNode aggregate = 6;
JoinNode join = 7;
SortNode sort = 8;
RepartitionNode repartition = 9;
EmptyRelationNode empty_relation = 10;
CreateExternalTableNode create_external_table = 11;
ExplainNode explain = 12;
WindowNode window = 13;
AnalyzeNode analyze = 14;
CrossJoinNode cross_join = 15;
ValuesNode values = 16;
LogicalExtensionNode extension = 17;
CreateCatalogSchemaNode create_catalog_schema = 18;
UnionNode union = 19;
CreateCatalogNode create_catalog = 20;
SubqueryAliasNode subquery_alias = 21;
CreateViewNode create_view = 22;
OffsetNode offset = 23;
}
}

message LogicalExtensionNode {
bytes node = 1;
repeated LogicalPlanNode inputs = 2;
}

message ProjectionColumns {
repeated string columns = 1;
}

message CsvFormat {
bool has_header = 1;
string delimiter = 2;
}

message ParquetFormat {
bool enable_pruning = 1;
}

message AvroFormat {}

message ListingTableScanNode {
string table_name = 1;
string path = 2;
string file_extension = 3;
ProjectionColumns projection = 4;
datafusion.Schema schema = 5;
repeated datafusion.LogicalExprNode filters = 6;
repeated string table_partition_cols = 7;
bool collect_stat = 8;
uint32 target_partitions = 9;
oneof FileFormatType {
CsvFormat csv = 10;
ParquetFormat parquet = 11;
AvroFormat avro = 12;
}
}

message ProjectionNode {
LogicalPlanNode input = 1;
repeated datafusion.LogicalExprNode expr = 2;
oneof optional_alias {
string alias = 3;
}
}

message SelectionNode {
LogicalPlanNode input = 1;
datafusion.LogicalExprNode expr = 2;
}

message SortNode {
LogicalPlanNode input = 1;
repeated datafusion.LogicalExprNode expr = 2;
}

message RepartitionNode {
LogicalPlanNode input = 1;
oneof partition_method {
uint64 round_robin = 2;
HashRepartition hash = 3;
}
}

message HashRepartition {
repeated datafusion.LogicalExprNode hash_expr = 1;
uint64 partition_count = 2;
}

message EmptyRelationNode {
bool produce_one_row = 1;
}

message CreateExternalTableNode {
string name = 1;
string location = 2;
FileType file_type = 3;
bool has_header = 4;
datafusion.DfSchema schema = 5;
repeated string table_partition_cols = 6;
bool if_not_exists = 7;
string delimiter = 8;
}

message CreateCatalogSchemaNode {
string schema_name = 1;
bool if_not_exists = 2;
datafusion.DfSchema schema = 3;
}

message CreateCatalogNode {
string catalog_name = 1;
bool if_not_exists = 2;
datafusion.DfSchema schema = 3;
}

message CreateViewNode {
string name = 1;
LogicalPlanNode input = 2;
bool or_replace = 3;
}

// a node containing data for defining values list. unlike in SQL where it's two dimensional, here
// the list is flattened, and with the field n_cols it can be parsed and partitioned into rows
message ValuesNode {
uint64 n_cols = 1;
repeated datafusion.LogicalExprNode values_list = 2;
}

enum FileType {
NdJson = 0;
Parquet = 1;
CSV = 2;
Avro = 3;
}

message AnalyzeNode {
LogicalPlanNode input = 1;
bool verbose = 2;
}

message ExplainNode {
LogicalPlanNode input = 1;
bool verbose = 2;
}

message AggregateNode {
LogicalPlanNode input = 1;
repeated datafusion.LogicalExprNode group_expr = 2;
repeated datafusion.LogicalExprNode aggr_expr = 3;
}

message WindowNode {
LogicalPlanNode input = 1;
repeated datafusion.LogicalExprNode window_expr = 2;
}

enum JoinType {
INNER = 0;
LEFT = 1;
RIGHT = 2;
FULL = 3;
SEMI = 4;
ANTI = 5;
}

enum JoinConstraint {
ON = 0;
USING = 1;
}

message JoinNode {
LogicalPlanNode left = 1;
LogicalPlanNode right = 2;
JoinType join_type = 3;
JoinConstraint join_constraint = 4;
repeated datafusion.Column left_join_column = 5;
repeated datafusion.Column right_join_column = 6;
bool null_equals_null = 7;
}

message UnionNode {
repeated LogicalPlanNode inputs = 1;
}

message CrossJoinNode {
LogicalPlanNode left = 1;
LogicalPlanNode right = 2;
}

message LimitNode {
LogicalPlanNode input = 1;
uint32 limit = 2;
}

message OffsetNode {
LogicalPlanNode input = 1;
uint32 offset = 2;
}

message SelectionExecNode {
datafusion.LogicalExprNode expr = 1;
}

message SubqueryAliasNode {
LogicalPlanNode input = 1;
string alias = 2;
}

// logical expressions
message LogicalExprNode {
oneof ExprType {
Expand Down
Loading