-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add serde for plans with tables from TableProviderFactory
s
#3907
Changes from 8 commits
9311f38
b6e694c
b79aa87
ff75155
03c4b65
7ff0caa
5ac869f
769aec1
95698df
6b79f85
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -70,6 +70,7 @@ message LogicalPlanNode { | |
CreateViewNode create_view = 22; | ||
DistinctNode distinct = 23; | ||
ViewTableScanNode view_scan = 24; | ||
CustomTableScanNode custom_scan = 25; | ||
} | ||
} | ||
|
||
|
@@ -118,6 +119,15 @@ message ViewTableScanNode { | |
string definition = 5; | ||
} | ||
|
||
// Logical Plan to Scan a CustomTableProvider registered at runtime | ||
message CustomTableScanNode { | ||
string table_name = 1; | ||
ProjectionColumns projection = 2; | ||
datafusion.Schema schema = 3; | ||
repeated datafusion.LogicalExprNode filters = 4; | ||
bytes custom_table_data = 5; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The above 4 fields are unrelated to the |
||
} | ||
|
||
message ProjectionNode { | ||
LogicalPlanNode input = 1; | ||
repeated datafusion.LogicalExprNode expr = 2; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,12 +18,16 @@ | |
//! Serialization / Deserialization to Bytes | ||
use crate::logical_plan::{AsLogicalPlan, LogicalExtensionCodec}; | ||
use crate::{from_proto::parse_expr, protobuf}; | ||
use arrow::datatypes::SchemaRef; | ||
use async_trait::async_trait; | ||
use datafusion::datasource::TableProvider; | ||
use datafusion_common::{DataFusionError, Result}; | ||
use datafusion_expr::{Expr, Extension, LogicalPlan}; | ||
use prost::{ | ||
bytes::{Bytes, BytesMut}, | ||
Message, | ||
}; | ||
use std::sync::Arc; | ||
|
||
// Reexport Bytes which appears in the API | ||
use datafusion::execution::registry::FunctionRegistry; | ||
|
@@ -132,37 +136,41 @@ pub fn logical_plan_to_bytes_with_extension_codec( | |
|
||
/// Deserialize a LogicalPlan from json | ||
#[cfg(feature = "json")] | ||
pub fn logical_plan_from_json(json: &str, ctx: &SessionContext) -> Result<LogicalPlan> { | ||
pub async fn logical_plan_from_json( | ||
json: &str, | ||
ctx: &SessionContext, | ||
) -> Result<LogicalPlan> { | ||
let back: protobuf::LogicalPlanNode = serde_json::from_str(json) | ||
.map_err(|e| DataFusionError::Plan(format!("Error serializing plan: {}", e)))?; | ||
let extension_codec = DefaultExtensionCodec {}; | ||
back.try_into_logical_plan(ctx, &extension_codec) | ||
back.try_into_logical_plan(ctx, &extension_codec).await | ||
} | ||
|
||
/// Deserialize a LogicalPlan from bytes | ||
pub fn logical_plan_from_bytes( | ||
pub async fn logical_plan_from_bytes( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It makes sense to me that these functions must become There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To follow up -- becomes |
||
bytes: &[u8], | ||
ctx: &SessionContext, | ||
) -> Result<LogicalPlan> { | ||
let extension_codec = DefaultExtensionCodec {}; | ||
logical_plan_from_bytes_with_extension_codec(bytes, ctx, &extension_codec) | ||
logical_plan_from_bytes_with_extension_codec(bytes, ctx, &extension_codec).await | ||
} | ||
|
||
/// Deserialize a LogicalPlan from bytes | ||
pub fn logical_plan_from_bytes_with_extension_codec( | ||
pub async fn logical_plan_from_bytes_with_extension_codec( | ||
bytes: &[u8], | ||
ctx: &SessionContext, | ||
extension_codec: &dyn LogicalExtensionCodec, | ||
) -> Result<LogicalPlan> { | ||
let protobuf = protobuf::LogicalPlanNode::decode(bytes).map_err(|e| { | ||
DataFusionError::Plan(format!("Error decoding expr as protobuf: {}", e)) | ||
})?; | ||
protobuf.try_into_logical_plan(ctx, extension_codec) | ||
protobuf.try_into_logical_plan(ctx, extension_codec).await | ||
} | ||
|
||
#[derive(Debug)] | ||
struct DefaultExtensionCodec {} | ||
|
||
#[async_trait] | ||
impl LogicalExtensionCodec for DefaultExtensionCodec { | ||
fn try_decode( | ||
&self, | ||
|
@@ -180,6 +188,27 @@ impl LogicalExtensionCodec for DefaultExtensionCodec { | |
"No extension codec provided".to_string(), | ||
)) | ||
} | ||
|
||
async fn try_decode_table_provider( | ||
&self, | ||
_buf: &[u8], | ||
_schema: SchemaRef, | ||
_ctx: &SessionContext, | ||
) -> std::result::Result<Arc<dyn TableProvider>, DataFusionError> { | ||
Err(DataFusionError::NotImplemented( | ||
"No extension codec provided".to_string(), | ||
avantgardnerio marked this conversation as resolved.
Show resolved
Hide resolved
|
||
)) | ||
} | ||
|
||
fn try_encode_table_provider( | ||
&self, | ||
_node: Arc<dyn TableProvider>, | ||
_buf: &mut Vec<u8>, | ||
) -> std::result::Result<(), DataFusionError> { | ||
Err(DataFusionError::NotImplemented( | ||
"No extension codec provided".to_string(), | ||
avantgardnerio marked this conversation as resolved.
Show resolved
Hide resolved
|
||
)) | ||
} | ||
} | ||
|
||
#[cfg(test)] | ||
|
@@ -214,12 +243,12 @@ mod test { | |
assert_eq!(actual, expected); | ||
} | ||
|
||
#[test] | ||
#[tokio::test] | ||
#[cfg(feature = "json")] | ||
fn json_to_plan() { | ||
async fn json_to_plan() { | ||
let input = r#"{"emptyRelation":{}}"#.to_string(); | ||
let ctx = SessionContext::new(); | ||
let actual = logical_plan_from_json(&input, &ctx).unwrap(); | ||
let actual = logical_plan_from_json(&input, &ctx).await.unwrap(); | ||
let result = matches!(actual, LogicalPlan::EmptyRelation(_)); | ||
assert!(result, "Should parse empty relation"); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there is an assumption that the file_types are always lower case in
table_factories
perhaps would it make sense to update the comment to that effect?https://github.com/apache/arrow-datafusion/blob/6e0097d35391fea0d57c1d2ecfdef18437f681f4/datafusion/core/src/execution/runtime_env.rs#L48