Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow listing tables to be created via TableFactories #4112

Merged
merged 2 commits into from
Nov 7, 2022

Conversation

avantgardnerio
Copy link
Contributor

Which issue does this PR close?

Closes #4111.

Rationale for this change

Described in issue.

What changes are included in this PR?

Described in issue.

Are there any user-facing changes?

They can register CSVs, parquets, jsons and avros from the datafusion-cli.

@github-actions github-actions bot added the core Core DataFusion crate label Nov 4, 2022
@avantgardnerio
Copy link
Contributor Author

@andygrove and @alamb sorry for the immediate follow-on PR. I did not realize we could merge Ballista PRs pointed at git refs of Datafusion or I would have done these two as a single PR.

.ok_or(DataFusionError::Internal("Schema not found!".to_string()))?;
let lister = schema.as_any().downcast_ref::<ListingSchemaProvider>();
if let Some(lister) = lister {
lister.refresh(&ctx.state()).await?;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Register tables on startup.

table_factories.insert("csv".to_string(), Arc::new(ListingTableFactory::new(FileType::CSV)));
table_factories.insert("parquet".to_string(), Arc::new(ListingTableFactory::new(FileType::PARQUET)));
table_factories.insert("avro".to_string(), Arc::new(ListingTableFactory::new(FileType::AVRO)));
table_factories.insert("json".to_string(), Arc::new(ListingTableFactory::new(FileType::JSON)));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb this is getting closer to something I think you asked for earlier: treating ListingTables and custom tables the same way and just registering factories for table types.

@@ -72,7 +74,7 @@ impl ListingSchemaProvider {
}

/// Reload table information from ObjectStore
pub async fn refresh(&self) -> datafusion_common::Result<()> {
pub async fn refresh(&self, state: &SessionState) -> datafusion_common::Result<()> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needed to add SessionState to allow ListingTables to load their schema.

@@ -100,13 +102,20 @@ impl ListingSchemaProvider {
.ok_or_else(|| {
DataFusionError::Internal("Cannot parse file name!".to_string())
})?;
let table_name = table.to_str().ok_or_else(|| {
let table_name = file_name.split('.').collect_vec()[0];
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deltatables are always folders, but .csvs should have their extension removed from their table name (and unfortunately the Path method to do this is marked unstable).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which method? Maybe we can contribute something back upstream to object_store 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://doc.rust-lang.org/std/path/struct.PathBuf.html#method.file_prefix

For some reason the github UI didn't let me respond in thread until now :/

@avantgardnerio
Copy link
Contributor Author

Screenshot from 2022-11-04 14-14-03

@avantgardnerio
Copy link
Contributor Author

avantgardnerio commented Nov 4, 2022

Screenshot from 2022-11-04 14-13-55

@andygrove I think you were requesting this feature ☝️

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code looks good to me -- is there any way we can write a test for this functionality?

@@ -100,13 +102,20 @@ impl ListingSchemaProvider {
.ok_or_else(|| {
DataFusionError::Internal("Cannot parse file name!".to_string())
})?;
let table_name = table.to_str().ok_or_else(|| {
let table_name = file_name.split('.').collect_vec()[0];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which method? Maybe we can contribute something back upstream to object_store 🤔

) -> datafusion_common::Result<Arc<dyn TableProvider>> {
let file_extension = self.file_type.get_ext();

let file_format: Arc<dyn FileFormat> = match self.file_type {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 it would be really neat to somehow combine the logic in ListingTable and ListingTableFactory (or maybe datafusion-cli could just use the factory -- not sure)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

combine the logic

I think this is the duplicated fragment. I would love to combine those two and only have register_table instead of create_listing_table() and create_custom_table().

@alamb
Copy link
Contributor

alamb commented Nov 5, 2022

I know tests are somewhat painful to write, but if we don't have them I worry about breaking the functionality at some future time

Maybe making a test as an example in datafusion-examples https://github.com/apache/arrow-datafusion/tree/master/datafusion-examples/examples would be a good way to

  1. document a bit more how to use these features,
  2. Ensure whatever APIs you need are properly exposed in the API
  3. Ensure we don't break the feature when we reorganize things in the future

@avantgardnerio
Copy link
Contributor Author

write a test

Done. I also filed this: #4114 . It's my own personal fault, so I understand if we don't want to inflict that on everyone.

@avantgardnerio
Copy link
Contributor Author

@@ -142,11 +147,31 @@ pub async fn main() -> Result<()> {
}

fn create_runtime_env() -> Result<RuntimeEnv> {
let mut table_factories: HashMap<String, Arc<dyn TableProviderFactory>> =
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add TableFactorys for default formats.

Err(e) => format!("{}", e),
Ok(_) => "".to_string()
};
assert_eq!("".to_string(), msg); // Fail with error message
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I refactored this test to have a better failure message because it was failing for me locally. The fact that CI passed makes me think it's not running there because datafusion-cli is excluded from the workspace - I don't know why this is, but would propose we include it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please file a ticket to do so? Thank you!

@@ -175,6 +175,26 @@ impl SessionContext {
Self::with_config(SessionConfig::new())
}

/// Finds any ListSchemaProviders and instructs them to reload tables from "disk"
pub async fn refresh_catalogs(&self) -> Result<()> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving this code to a common function on the context which we can use from datafusion-cli, tests, Ballista, etc.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense to me

@@ -1764,7 +1784,7 @@ impl ContextProvider for SessionState {
Ok(schema) => {
let provider = schema.table(resolved_ref.table).ok_or_else(|| {
DataFusionError::Plan(format!(
"'{}.{}.{}' not found",
"table '{}.{}.{}' not found",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not found is a hard error message to ctrl-f for. Adding the word table will hopefully make this statistically more likely to be found.

| Customer#000000002 |
| Customer#000000003 |
| Customer#000000004 |
+--------------------+"#;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The proof is in the pudding. Can't select from a table without registering it first, so this must be auto-registered.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great to me -- thank you @avantgardnerio

I also merged this PR to master locally and all the tests still pass for me

I left a few comments, but I don't think any of them are required to merge this PR

Err(e) => format!("{}", e),
Ok(_) => "".to_string()
};
assert_eq!("".to_string(), msg); // Fail with error message
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please file a ticket to do so? Thank you!

@@ -175,6 +175,26 @@ impl SessionContext {
Self::with_config(SessionConfig::new())
}

/// Finds any ListSchemaProviders and instructs them to reload tables from "disk"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Finds any ListSchemaProviders and instructs them to reload tables from "disk"
/// Invokes `ListingSchemaProvider::reload()` for all registered providers

@@ -175,6 +175,26 @@ impl SessionContext {
Self::with_config(SessionConfig::new())
}

/// Finds any ListSchemaProviders and instructs them to reload tables from "disk"
pub async fn refresh_catalogs(&self) -> Result<()> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense to me

plan_and_collect(&ctx, "select c_name from default.customer limit 3;")
.await?;

let actual = arrow::util::pretty::pretty_format_batches(&result)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might also consider using assert_batches_eq here in this test

@avantgardnerio
Copy link
Contributor Author

I don't think any of them are required to merge this PR

Then if it's alright with you, I'd love to get it merged rather than wait for another round of CI checks. Thanks for thoroughly reviewing it!

@alamb alamb merged commit 7b5842b into apache:master Nov 7, 2022
@ursabot
Copy link

ursabot commented Nov 7, 2022

Benchmark runs are scheduled for baseline = c1fc732 and contender = 7b5842b. 7b5842b is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
Conbench compare runs links:
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ec2-t3-xlarge-us-east-2] ec2-t3-xlarge-us-east-2
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on test-mac-arm] test-mac-arm
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ursa-i9-9960x] ursa-i9-9960x
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ursa-thinkcentre-m75q] ursa-thinkcentre-m75q
Buildkite builds:
Supported benchmarks:
ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
test-mac-arm: Supported benchmark langs: C++, Python, R
ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Allow for automatic registration of ListingTables
3 participants