-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Allow listing tables to be created via TableFactories #4112
Conversation
@andygrove and @alamb sorry for the immediate follow-on PR. I did not realize we could merge Ballista PRs pointed at git refs of Datafusion or I would have done these two as a single PR. |
datafusion-cli/src/main.rs
Outdated
.ok_or(DataFusionError::Internal("Schema not found!".to_string()))?; | ||
let lister = schema.as_any().downcast_ref::<ListingSchemaProvider>(); | ||
if let Some(lister) = lister { | ||
lister.refresh(&ctx.state()).await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Register tables on startup.
datafusion-cli/src/main.rs
Outdated
table_factories.insert("csv".to_string(), Arc::new(ListingTableFactory::new(FileType::CSV))); | ||
table_factories.insert("parquet".to_string(), Arc::new(ListingTableFactory::new(FileType::PARQUET))); | ||
table_factories.insert("avro".to_string(), Arc::new(ListingTableFactory::new(FileType::AVRO))); | ||
table_factories.insert("json".to_string(), Arc::new(ListingTableFactory::new(FileType::JSON))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alamb this is getting closer to something I think you asked for earlier: treating ListingTables
and custom tables the same way and just registering factories for table types.
@@ -72,7 +74,7 @@ impl ListingSchemaProvider { | |||
} | |||
|
|||
/// Reload table information from ObjectStore | |||
pub async fn refresh(&self) -> datafusion_common::Result<()> { | |||
pub async fn refresh(&self, state: &SessionState) -> datafusion_common::Result<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needed to add SessionState
to allow ListingTable
s to load their schema.
@@ -100,13 +102,20 @@ impl ListingSchemaProvider { | |||
.ok_or_else(|| { | |||
DataFusionError::Internal("Cannot parse file name!".to_string()) | |||
})?; | |||
let table_name = table.to_str().ok_or_else(|| { | |||
let table_name = file_name.split('.').collect_vec()[0]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Deltatables are always folders, but .csv
s should have their extension removed from their table name (and unfortunately the Path
method to do this is marked unstable).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Which method? Maybe we can contribute something back upstream to object_store 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://doc.rust-lang.org/std/path/struct.PathBuf.html#method.file_prefix
For some reason the github UI didn't let me respond in thread until now :/
@andygrove I think you were requesting this feature ☝️ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code looks good to me -- is there any way we can write a test for this functionality?
@@ -100,13 +102,20 @@ impl ListingSchemaProvider { | |||
.ok_or_else(|| { | |||
DataFusionError::Internal("Cannot parse file name!".to_string()) | |||
})?; | |||
let table_name = table.to_str().ok_or_else(|| { | |||
let table_name = file_name.split('.').collect_vec()[0]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Which method? Maybe we can contribute something back upstream to object_store 🤔
) -> datafusion_common::Result<Arc<dyn TableProvider>> { | ||
let file_extension = self.file_type.get_ext(); | ||
|
||
let file_format: Arc<dyn FileFormat> = match self.file_type { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤔 it would be really neat to somehow combine the logic in ListingTable and ListingTableFactory (or maybe datafusion-cli could just use the factory -- not sure)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
combine the logic
I think this is the duplicated fragment. I would love to combine those two and only have register_table
instead of create_listing_table()
and create_custom_table()
.
I know tests are somewhat painful to write, but if we don't have them I worry about breaking the functionality at some future time Maybe making a test as an example in datafusion-examples https://github.com/apache/arrow-datafusion/tree/master/datafusion-examples/examples would be a good way to
|
Done. I also filed this: #4114 . It's my own personal fault, so I understand if we don't want to inflict that on everyone. |
@@ -142,11 +147,31 @@ pub async fn main() -> Result<()> { | |||
} | |||
|
|||
fn create_runtime_env() -> Result<RuntimeEnv> { | |||
let mut table_factories: HashMap<String, Arc<dyn TableProviderFactory>> = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add TableFactory
s for default formats.
Err(e) => format!("{}", e), | ||
Ok(_) => "".to_string() | ||
}; | ||
assert_eq!("".to_string(), msg); // Fail with error message |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I refactored this test to have a better failure message because it was failing for me locally. The fact that CI passed makes me think it's not running there because datafusion-cli
is excluded from the workspace - I don't know why this is, but would propose we include it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please file a ticket to do so? Thank you!
@@ -175,6 +175,26 @@ impl SessionContext { | |||
Self::with_config(SessionConfig::new()) | |||
} | |||
|
|||
/// Finds any ListSchemaProviders and instructs them to reload tables from "disk" | |||
pub async fn refresh_catalogs(&self) -> Result<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moving this code to a common function on the context which we can use from datafusion-cli
, tests, Ballista, etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense to me
@@ -1764,7 +1784,7 @@ impl ContextProvider for SessionState { | |||
Ok(schema) => { | |||
let provider = schema.table(resolved_ref.table).ok_or_else(|| { | |||
DataFusionError::Plan(format!( | |||
"'{}.{}.{}' not found", | |||
"table '{}.{}.{}' not found", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not found
is a hard error message to ctrl-f for. Adding the word table
will hopefully make this statistically more likely to be found.
| Customer#000000002 | | ||
| Customer#000000003 | | ||
| Customer#000000004 | | ||
+--------------------+"#; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The proof is in the pudding. Can't select from a table without registering it first, so this must be auto-registered.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great to me -- thank you @avantgardnerio
I also merged this PR to master locally and all the tests still pass for me
I left a few comments, but I don't think any of them are required to merge this PR
Err(e) => format!("{}", e), | ||
Ok(_) => "".to_string() | ||
}; | ||
assert_eq!("".to_string(), msg); // Fail with error message |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please file a ticket to do so? Thank you!
@@ -175,6 +175,26 @@ impl SessionContext { | |||
Self::with_config(SessionConfig::new()) | |||
} | |||
|
|||
/// Finds any ListSchemaProviders and instructs them to reload tables from "disk" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// Finds any ListSchemaProviders and instructs them to reload tables from "disk" | |
/// Invokes `ListingSchemaProvider::reload()` for all registered providers |
@@ -175,6 +175,26 @@ impl SessionContext { | |||
Self::with_config(SessionConfig::new()) | |||
} | |||
|
|||
/// Finds any ListSchemaProviders and instructs them to reload tables from "disk" | |||
pub async fn refresh_catalogs(&self) -> Result<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense to me
plan_and_collect(&ctx, "select c_name from default.customer limit 3;") | ||
.await?; | ||
|
||
let actual = arrow::util::pretty::pretty_format_batches(&result) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You might also consider using assert_batches_eq
here in this test
Then if it's alright with you, I'd love to get it merged rather than wait for another round of CI checks. Thanks for thoroughly reviewing it! |
Benchmark runs are scheduled for baseline = c1fc732 and contender = 7b5842b. 7b5842b is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
Which issue does this PR close?
Closes #4111.
Rationale for this change
Described in issue.
What changes are included in this PR?
Described in issue.
Are there any user-facing changes?
They can register CSVs, parquets, jsons and avros from the datafusion-cli.