-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add serde for plans with tables from TableProviderFactory
s
#3907
Conversation
I'm at a loss as to how/where to test this. It makes things work in other repos by definition (i.e. delta-io/delta-rs#892). I think I just need to make a NoOpTableProvider and write a test that does serde on it? (@andygrove @alamb ) |
I added a roundtrip test. I think this covers it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't fully understand this proposal -- I left some comments
datafusion/proto/src/logical_plan.rs
Outdated
filters, | ||
listing_table, | ||
) | ||
} else if let Some(custom_table) = source.downcast_ref::<CustomTable>() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this seems to be the key issue -- if source
isn't some type that DataFusion knows about it can't do a useful job serializing it.
I don't fully understand how a CustomTable
struct will help -- at some point the client code needs to get the concrete TableProvider
, not a CustomTable
.
I think there was a similar issue serializing "extensions" for which @thinkharderdev added
pub trait LogicalExtensionCodec: Debug + Send + Sync {
I wonder if we can follow the same model here , something like add LogicalExtensionCodec::try_encoded_table_provider
and LogicalExtensionCodec::try_decode_table_provider
if let Some(listing_table) = source.downcast_ref::<ListingTable>() {
// ...
} else {
// delegate to provided extension codec:
extension_codec.try_encode_table_provider(
table_name,
source,
filters,
projection,
);
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't fully understand this proposal
That delta PR really explains what I'm trying to do. Unfortunately the end goal spans datafusion, delta-rs & ballista, so it's hard to convey. And I'm open to doing it simpler ways if they exist and fulfill the broader goal of supporting table types unknown to datafusion (i.e. delta, but possibly others).
the client code needs to get the concrete TableProvider
As long as the deserializing node also has the TableProviderFactory
registered, it can get a concrete one created. The CustomTable is just a wrapper that contains enough information to get it instantiated by the factory.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we can follow the same model here
Reviewing...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if let Some(listing_table) = source.downcast_ref::<ListingTable>() {
// ...
} else {
I don't think this would fit with the goal, as datafusion would have to include types at compile time for each table, i.e. we'd have to make a compile time dependency on delta-rs which itself has a compile-time dependency on datafusion already.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
have to include types at compile time
Unless that downcast_ref::
trick works on traits as well as structs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see it works via the dyn Any
trait, so that seems okay. I was worried about having to add delta-rs
stuff to the datafusion.proto
, but I see pub struct TopKPlanProto
gets away without modifying the protobuf definition. If you think this will work @alamb I'll start work on another PR doing it with the codec instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this would fit with the goal, as datafusion would have to include types at compile time for each table, i.e. we'd have to make a compile time dependency on delta-rs which itself has a compile-time dependency on datafusion already.
Yeah, I think the "issue" is that delta-rs will have to provide its own serialization / deserialization
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will have to provide its own serialization / deserialization
That works for me!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we'll need something like:
message LogicalPlanNode {
oneof LogicalPlanType {
CustomTableScanNode custom_scan = 25;
and
message CustomTableScanNode {
bytes custom_table_data = 1;
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, it looks like we are conflating the scan
data with the TableProvider
data when we serialize. I split them out in CustomTableScanNode
.
let mut table_factories: HashMap<String, Arc<dyn TableProviderFactory>> = | ||
HashMap::new(); | ||
table_factories.insert("deltatable".to_string(), Arc::new(TestTableFactory {})); | ||
let cfg = RuntimeConfig::new().with_table_factories(table_factories); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As long as the deserializing side has the same TableFactory
registered, this will work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that is perfect
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah -- this makes the most sense -- basically the system which made the table provider needs to provide details on how to serialize/deserialize it
722a056
to
02243ce
Compare
ProjectionColumns projection = 2; | ||
datafusion.Schema schema = 3; | ||
repeated datafusion.LogicalExprNode filters = 4; | ||
bytes custom_table_data = 5; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The above 4 fields are unrelated to the TableProvider
- they come from the scan
. The ListingTableScan
doesn't have this problem because it can happily combine the two logical units into one message, but unless we want each custom table type to reimplement this logic, we should probably keep it here.
Failing on scheduler due to no factories Tests pass Back to "no object store available for delta-rs://home-bgardner-workspace" Switch back to git refs CI fixes Add roundtrip test Passing deltalake test Passing serde test Remove unrelated refactor Formatting Fix typo that was hard to debug CI fixes delta & ballista tests pass
165c56e
to
9311f38
Compare
Co-authored-by: xudong.w <wxd963996380@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks great to me @avantgardnerio -- thank you . The code and tests look good to me
The only thing I wonder about is the PhysicalExtensionCodec
introduced -- otherwise I think this PR is ready to go
@@ -418,19 +418,18 @@ impl SessionContext { | |||
cmd: &CreateExternalTable, | |||
) -> Result<Arc<DataFrame>> { | |||
let state = self.state.read().clone(); | |||
let file_type = cmd.file_type.to_lowercase(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there is an assumption that the file_types are always lower case in table_factories
perhaps would it make sense to update the comment to that effect?
} | ||
|
||
/// Deserialize a LogicalPlan from bytes | ||
pub fn logical_plan_from_bytes( | ||
pub async fn logical_plan_from_bytes( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It makes sense to me that these functions must become async
in order to (potentially) instantiate a table provider 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To follow up -- becomes async
so that a table provider can be instantiated. This table provider, such as delta-rs, might have to do remote IO
let mut table_factories: HashMap<String, Arc<dyn TableProviderFactory>> = | ||
HashMap::new(); | ||
table_factories.insert("deltatable".to_string(), Arc::new(TestTableFactory {})); | ||
let cfg = RuntimeConfig::new().with_table_factories(table_factories); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah -- this makes the most sense -- basically the system which made the table provider needs to provide details on how to serialize/deserialize it
@@ -93,6 +100,22 @@ pub trait AsLogicalPlan: Debug + Send + Sync + Clone { | |||
Self: Sized; | |||
} | |||
|
|||
pub trait PhysicalExtensionCodec: Debug + Send + Sync { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this trait a left over? I didn't think this PR added support for serializing / deserializing physical plans
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was moved out of Ballista and into datafusion. I would think being a libarary, DataFusion would have to represent both physical and logical plans in memory, but not serialize them. For reasons which I am not yet familiar, DataFusion contains the code to serialize logical plans, but Ballista is in charge of serializing physical plans, so this PhysicalExtensionCodec was defined there, and this PR moves it into DataFusion.
As for the reason why, libraries like delta-rs will want to extend these traits (TableProviderFactory, LogicalExtensionCodec, and PhysicalExtensionCodec) to provide their own tables to executables up the chain like Ballista, and to do this, it needs to be defined below them in the dependency tree so they can implement it... see https://github.com/spaceandtimelabs/delta-rs/blob/379558798b644d3a269514a99b322850d5d4b5a6/rust/src/delta_datafusion.rs#L754 for an example.
Eventually this series of PRs will end in a Ballista PR with a deltalake
feature and if specified it will bring in delta's implementation of the above traits and make delta tables registerable in SQL at runtime.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is odd that we ended up with logical plan serde in DataFusion and physical plan serde in Ballista. I could see an argument for moving the physical plan serde into DataFusion, with Ballista extending this for its custom physical operators. @alamb What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think putting physical plan serde for structs defined in DataFusion makes lots of sense to me 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I filed #3949 for this
.iter() | ||
.map(|i| i.try_into_logical_plan(ctx, extension_codec)) | ||
.collect::<Result<_, DataFusionError>>()?; | ||
let mut input_plans: Vec<LogicalPlan> = vec![]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
making something async and having to switch from a functional style is unfortunate -- one can also make streams but that syntax (well really the errors if you get it wrong) becomes wild
) -> Result<(), DataFusionError>; | ||
} | ||
|
||
#[async_trait] | ||
pub trait LogicalExtensionCodec: Debug + Send + Sync { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We wonder if we should rename LogicalExtensionCodec
to something more generic, but we can do that in the future perhaps
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
Thanks everyone who helped make this PR -- both the code and reviews 🚀 |
Thanks everyone, on to the next PR! :D |
Hi @alamb @avantgardnerio, why we change the interface of |
I left a comment on the other PR... let's have the discussion there (so there's one place to track). |
When we reach a resolution it would be great to add some comments with the rationale of which way we decided ( |
Benchmark runs are scheduled for baseline = e1f866e and contender = 9595b8d. 9595b8d is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
…3907) * Can compile and run test Failing on scheduler due to no factories Tests pass Back to "no object store available for delta-rs://home-bgardner-workspace" Switch back to git refs CI fixes Add roundtrip test Passing deltalake test Passing serde test Remove unrelated refactor Formatting Fix typo that was hard to debug CI fixes delta & ballista tests pass * Take Andy's advice and turn it async * Fix CI * No suitable object store on executor * Fix test * Fix test * Bump CI * Update datafusion/core/src/datasource/datasource.rs Co-authored-by: xudong.w <wxd963996380@gmail.com> * Update datafusion/proto/src/bytes/mod.rs Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org> * Update datafusion/proto/src/bytes/mod.rs Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org> Co-authored-by: xudong.w <wxd963996380@gmail.com> Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
Which issue does this PR close?
Closes #3906.
Rationale for this change
Described in issue.
What changes are included in this PR?
Described in issue.
Are there any user-facing changes?
Plans relying on
TableProviderFatory
s can be shipped.