Skip to content

Add a hint about expected extension in error message in register_csv,… #14168

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions datafusion/core/src/execution/context/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ impl SessionContext {
let listing_options = options
.to_listing_options(&self.copied_config(), self.copied_table_options());

self.register_type_check(table_path.as_ref(), &listing_options.file_extension)?;

self.register_listing_table(
table_ref,
table_path,
Expand Down
2 changes: 2 additions & 0 deletions datafusion/core/src/execution/context/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ impl SessionContext {
let listing_options = options
.to_listing_options(&self.copied_config(), self.copied_table_options());

self.register_type_check(table_path.as_ref(), &listing_options.file_extension)?;

self.register_listing_table(
table_ref,
table_path,
Expand Down
2 changes: 2 additions & 0 deletions datafusion/core/src/execution/context/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ impl SessionContext {
let listing_options = options
.to_listing_options(&self.copied_config(), self.copied_table_options());

self.register_type_check(table_path.as_ref(), &listing_options.file_extension)?;

self.register_listing_table(
table_ref,
table_path,
Expand Down
23 changes: 23 additions & 0 deletions datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1379,6 +1379,29 @@ impl SessionContext {
Ok(())
}

fn register_type_check<P: DataFilePaths>(
&self,
table_paths: P,
extension: impl AsRef<str>,
) -> Result<()> {
let table_paths = table_paths.to_urls()?;
if table_paths.is_empty() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, what if the folder contains mixed parquet files and some .txt will that error trigger? That would be not very expected behavior

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this good point! I checked this. If folder contains mixed format files, that will not error trigger!
I agree that if it failed, it would not be expected behavior. This code helps to prevent this. I found it in _read_type (https://docs.rs/datafusion/latest/src/datafusion/execution/context/mod.rs.html#1261)

  if !file_path.ends_with(extension) && !path.is_collection() {
      return exec_err!(
          "File path '{file_path}' does not match the expected extension '{extension}'"
      );
  }

return exec_err!("No table paths were provided");
}

// check if the file extension matches the expected extension
let extension = extension.as_ref();
for path in &table_paths {
let file_path = path.as_str();
if !file_path.ends_with(extension) && !path.is_collection() {
return exec_err!(
"File path '{file_path}' does not match the expected extension '{extension}'"
);
}
}
Ok(())
}

/// Registers an Arrow file as a table that can be referenced from
/// SQL statements executed against this context.
pub async fn register_arrow(
Expand Down
2 changes: 2 additions & 0 deletions datafusion/core/src/execution/context/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ impl SessionContext {
let listing_options = options
.to_listing_options(&self.copied_config(), self.copied_table_options());

self.register_type_check(table_path.as_ref(), &listing_options.file_extension)?;

self.register_listing_table(
table_ref,
table_path,
Expand Down
65 changes: 63 additions & 2 deletions datafusion/core/tests/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,9 @@ use datafusion::error::Result;
use datafusion::execution::context::SessionContext;
use datafusion::execution::session_state::SessionStateBuilder;
use datafusion::logical_expr::{ColumnarValue, Volatility};
use datafusion::prelude::{CsvReadOptions, ParquetReadOptions};
use datafusion::prelude::{JoinType, NdJsonReadOptions};
use datafusion::prelude::{
AvroReadOptions, CsvReadOptions, JoinType, NdJsonReadOptions, ParquetReadOptions,
};
use datafusion::test_util::{
parquet_test_data, populate_csv_partitions, register_aggregate_csv, test_table,
test_table_with_name,
Expand Down Expand Up @@ -5121,3 +5122,63 @@ async fn test_alias_nested() -> Result<()> {
);
Ok(())
}

#[tokio::test]
async fn register_non_json_file() {
let ctx = SessionContext::new();
let err = ctx
.register_json(
"data",
"tests/data/test_binary.parquet",
NdJsonReadOptions::default(),
)
.await;
assert_contains!(
err.unwrap_err().to_string(),
"test_binary.parquet' does not match the expected extension '.json'"
);
}

#[tokio::test]
async fn register_non_csv_file() {
let ctx = SessionContext::new();
let err = ctx
.register_csv(
"data",
"tests/data/test_binary.parquet",
CsvReadOptions::default(),
)
.await;
assert_contains!(
err.unwrap_err().to_string(),
"test_binary.parquet' does not match the expected extension '.csv'"
);
}

#[tokio::test]
async fn register_non_avro_file() {
let ctx = SessionContext::new();
let err = ctx
.register_avro(
"data",
"tests/data/test_binary.parquet",
AvroReadOptions::default(),
)
.await;
assert_contains!(
err.unwrap_err().to_string(),
"test_binary.parquet' does not match the expected extension '.avro'"
);
}

#[tokio::test]
async fn register_non_parquet_file() {
let ctx = SessionContext::new();
let err = ctx
.register_parquet("data", "tests/data/1.json", ParquetReadOptions::default())
.await;
assert_contains!(
err.unwrap_err().to_string(),
"1.json' does not match the expected extension '.parquet'"
);
}
Loading