Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add serde for plans with tables from TableProviderFactorys #3907

Merged
merged 10 commits into from
Oct 24, 2022

Conversation

avantgardnerio
Copy link
Contributor

Which issue does this PR close?

Closes #3906.

Rationale for this change

Described in issue.

What changes are included in this PR?

Described in issue.

Are there any user-facing changes?

Plans relying on TableProviderFatorys can be shipped.

@github-actions github-actions bot added the core Core DataFusion crate label Oct 20, 2022
@avantgardnerio
Copy link
Contributor Author

I'm at a loss as to how/where to test this. It makes things work in other repos by definition (i.e. delta-io/delta-rs#892). I think I just need to make a NoOpTableProvider and write a test that does serde on it? (@andygrove @alamb )

@avantgardnerio
Copy link
Contributor Author

at a loss as to how/where to test this

I added a roundtrip test. I think this covers it.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't fully understand this proposal -- I left some comments

filters,
listing_table,
)
} else if let Some(custom_table) = source.downcast_ref::<CustomTable>() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this seems to be the key issue -- if source isn't some type that DataFusion knows about it can't do a useful job serializing it.

I don't fully understand how a CustomTable struct will help -- at some point the client code needs to get the concrete TableProvider, not a CustomTable.

I think there was a similar issue serializing "extensions" for which @thinkharderdev added

pub trait LogicalExtensionCodec: Debug + Send + Sync {

I wonder if we can follow the same model here , something like add LogicalExtensionCodec::try_encoded_table_provider and LogicalExtensionCodec::try_decode_table_provider

if let Some(listing_table) = source.downcast_ref::<ListingTable>() {
  // ...
} else {
  // delegate to provided extension codec:
  extension_codec.try_encode_table_provider(
                table_name,
                source,
                filters,
                projection,
  ); 
} 

Copy link
Contributor Author

@avantgardnerio avantgardnerio Oct 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't fully understand this proposal

That delta PR really explains what I'm trying to do. Unfortunately the end goal spans datafusion, delta-rs & ballista, so it's hard to convey. And I'm open to doing it simpler ways if they exist and fulfill the broader goal of supporting table types unknown to datafusion (i.e. delta, but possibly others).

the client code needs to get the concrete TableProvider

As long as the deserializing node also has the TableProviderFactory registered, it can get a concrete one created. The CustomTable is just a wrapper that contains enough information to get it instantiated by the factory.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we can follow the same model here

Reviewing...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if let Some(listing_table) = source.downcast_ref::<ListingTable>() {
  // ...
} else {

I don't think this would fit with the goal, as datafusion would have to include types at compile time for each table, i.e. we'd have to make a compile time dependency on delta-rs which itself has a compile-time dependency on datafusion already.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have to include types at compile time

Unless that downcast_ref:: trick works on traits as well as structs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see it works via the dyn Any trait, so that seems okay. I was worried about having to add delta-rs stuff to the datafusion.proto, but I see pub struct TopKPlanProto gets away without modifying the protobuf definition. If you think this will work @alamb I'll start work on another PR doing it with the codec instead.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this would fit with the goal, as datafusion would have to include types at compile time for each table, i.e. we'd have to make a compile time dependency on delta-rs which itself has a compile-time dependency on datafusion already.

Yeah, I think the "issue" is that delta-rs will have to provide its own serialization / deserialization

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will have to provide its own serialization / deserialization

That works for me!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we'll need something like:

message LogicalPlanNode {
  oneof LogicalPlanType {
    CustomTableScanNode custom_scan = 25;

and

message CustomTableScanNode {
  bytes custom_table_data = 1;
}

Copy link
Contributor Author

@avantgardnerio avantgardnerio Oct 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, it looks like we are conflating the scan data with the TableProvider data when we serialize. I split them out in CustomTableScanNode.

let mut table_factories: HashMap<String, Arc<dyn TableProviderFactory>> =
HashMap::new();
table_factories.insert("deltatable".to_string(), Arc::new(TestTableFactory {}));
let cfg = RuntimeConfig::new().with_table_factories(table_factories);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As long as the deserializing side has the same TableFactory registered, this will work.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that is perfect

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah -- this makes the most sense -- basically the system which made the table provider needs to provide details on how to serialize/deserialize it

ProjectionColumns projection = 2;
datafusion.Schema schema = 3;
repeated datafusion.LogicalExprNode filters = 4;
bytes custom_table_data = 5;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The above 4 fields are unrelated to the TableProvider - they come from the scan. The ListingTableScan doesn't have this problem because it can happily combine the two logical units into one message, but unless we want each custom table type to reimplement this logic, we should probably keep it here.

Failing on scheduler due to no factories

Tests pass

Back to "no object store available for delta-rs://home-bgardner-workspace"

Switch back to git refs

CI fixes

Add roundtrip test

Passing deltalake test

Passing serde test

Remove unrelated refactor

Formatting

Fix typo that was hard to debug

CI fixes

delta & ballista tests pass
Brent Gardner and others added 2 commits October 24, 2022 10:51
Co-authored-by: xudong.w <wxd963996380@gmail.com>
@avantgardnerio avantgardnerio requested review from xudong963 and removed request for alamb October 24, 2022 18:28
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks great to me @avantgardnerio -- thank you . The code and tests look good to me

The only thing I wonder about is the PhysicalExtensionCodec introduced -- otherwise I think this PR is ready to go

@@ -418,19 +418,18 @@ impl SessionContext {
cmd: &CreateExternalTable,
) -> Result<Arc<DataFrame>> {
let state = self.state.read().clone();
let file_type = cmd.file_type.to_lowercase();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is an assumption that the file_types are always lower case in table_factories perhaps would it make sense to update the comment to that effect?

https://github.com/apache/arrow-datafusion/blob/6e0097d35391fea0d57c1d2ecfdef18437f681f4/datafusion/core/src/execution/runtime_env.rs#L48

}

/// Deserialize a LogicalPlan from bytes
pub fn logical_plan_from_bytes(
pub async fn logical_plan_from_bytes(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes sense to me that these functions must become async in order to (potentially) instantiate a table provider 👍

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To follow up -- becomes async so that a table provider can be instantiated. This table provider, such as delta-rs, might have to do remote IO

datafusion/proto/src/bytes/mod.rs Outdated Show resolved Hide resolved
datafusion/proto/src/bytes/mod.rs Outdated Show resolved Hide resolved
let mut table_factories: HashMap<String, Arc<dyn TableProviderFactory>> =
HashMap::new();
table_factories.insert("deltatable".to_string(), Arc::new(TestTableFactory {}));
let cfg = RuntimeConfig::new().with_table_factories(table_factories);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah -- this makes the most sense -- basically the system which made the table provider needs to provide details on how to serialize/deserialize it

@@ -93,6 +100,22 @@ pub trait AsLogicalPlan: Debug + Send + Sync + Clone {
Self: Sized;
}

pub trait PhysicalExtensionCodec: Debug + Send + Sync {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this trait a left over? I didn't think this PR added support for serializing / deserializing physical plans

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was moved out of Ballista and into datafusion. I would think being a libarary, DataFusion would have to represent both physical and logical plans in memory, but not serialize them. For reasons which I am not yet familiar, DataFusion contains the code to serialize logical plans, but Ballista is in charge of serializing physical plans, so this PhysicalExtensionCodec was defined there, and this PR moves it into DataFusion.

As for the reason why, libraries like delta-rs will want to extend these traits (TableProviderFactory, LogicalExtensionCodec, and PhysicalExtensionCodec) to provide their own tables to executables up the chain like Ballista, and to do this, it needs to be defined below them in the dependency tree so they can implement it... see https://github.com/spaceandtimelabs/delta-rs/blob/379558798b644d3a269514a99b322850d5d4b5a6/rust/src/delta_datafusion.rs#L754 for an example.

Eventually this series of PRs will end in a Ballista PR with a deltalake feature and if specified it will bring in delta's implementation of the above traits and make delta tables registerable in SQL at runtime.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is odd that we ended up with logical plan serde in DataFusion and physical plan serde in Ballista. I could see an argument for moving the physical plan serde into DataFusion, with Ballista extending this for its custom physical operators. @alamb What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think putting physical plan serde for structs defined in DataFusion makes lots of sense to me 👍

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I filed #3949 for this

.iter()
.map(|i| i.try_into_logical_plan(ctx, extension_codec))
.collect::<Result<_, DataFusionError>>()?;
let mut input_plans: Vec<LogicalPlan> = vec![];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

making something async and having to switch from a functional style is unfortunate -- one can also make streams but that syntax (well really the errors if you get it wrong) becomes wild

) -> Result<(), DataFusionError>;
}

#[async_trait]
pub trait LogicalExtensionCodec: Debug + Send + Sync {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We wonder if we should rename LogicalExtensionCodec to something more generic, but we can do that in the future perhaps

Brent Gardner and others added 2 commits October 24, 2022 14:11
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
@alamb
Copy link
Contributor

alamb commented Oct 24, 2022

Thanks everyone who helped make this PR -- both the code and reviews 🚀

@alamb alamb merged commit 9595b8d into apache:master Oct 24, 2022
@avantgardnerio avantgardnerio deleted the bg_custom_tables branch October 24, 2022 21:25
@avantgardnerio
Copy link
Contributor Author

Thanks everyone, on to the next PR! :D

@yahoNanJing
Copy link
Contributor

Hi @alamb @avantgardnerio, why we change the interface of try_into_logical_plan to be async?

@avantgardnerio
Copy link
Contributor Author

why we change the interface

I left a comment on the other PR... let's have the discussion there (so there's one place to track).

@alamb
Copy link
Contributor

alamb commented Oct 25, 2022

When we reach a resolution it would be great to add some comments with the rationale of which way we decided (async or non async) as well

@ursabot
Copy link

ursabot commented Oct 25, 2022

Benchmark runs are scheduled for baseline = e1f866e and contender = 9595b8d. 9595b8d is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
Conbench compare runs links:
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ec2-t3-xlarge-us-east-2] ec2-t3-xlarge-us-east-2
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on test-mac-arm] test-mac-arm
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ursa-i9-9960x] ursa-i9-9960x
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ursa-thinkcentre-m75q] ursa-thinkcentre-m75q
Buildkite builds:
Supported benchmarks:
ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
test-mac-arm: Supported benchmark langs: C++, Python, R
ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java

Dandandan pushed a commit to yuuch/arrow-datafusion that referenced this pull request Nov 5, 2022
…3907)

* Can compile and run test

Failing on scheduler due to no factories

Tests pass

Back to "no object store available for delta-rs://home-bgardner-workspace"

Switch back to git refs

CI fixes

Add roundtrip test

Passing deltalake test

Passing serde test

Remove unrelated refactor

Formatting

Fix typo that was hard to debug

CI fixes

delta & ballista tests pass

* Take Andy's advice and turn it async

* Fix CI

* No suitable object store on executor

* Fix test

* Fix test

* Bump CI

* Update datafusion/core/src/datasource/datasource.rs

Co-authored-by: xudong.w <wxd963996380@gmail.com>

* Update datafusion/proto/src/bytes/mod.rs

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>

* Update datafusion/proto/src/bytes/mod.rs

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>

Co-authored-by: xudong.w <wxd963996380@gmail.com>
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Plans with tables from TableProviderFactorys can't be serialized
6 participants