Skip to content

Commit 04291e1

Browse files
committed
fix can not load parquet table form spark
1 parent 6ec18bb commit 04291e1

File tree

8 files changed

+35
-21
lines changed

8 files changed

+35
-21
lines changed

benchmarks/src/bin/tpch.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ use datafusion::{
5555
};
5656

5757
use structopt::StructOpt;
58+
use datafusion::datasource::file_format::csv::DEFAULT_CSV_EXTENSION;
59+
use datafusion::datasource::file_format::parquet::DEFAULT_PARQUET_EXTENSION;
5860

5961
#[cfg(feature = "snmalloc")]
6062
#[global_allocator]
@@ -652,13 +654,13 @@ fn get_table(
652654
.with_delimiter(b',')
653655
.with_has_header(true);
654656

655-
(Arc::new(format), path, ".csv")
657+
(Arc::new(format), path, DEFAULT_CSV_EXTENSION)
656658
}
657659
"parquet" => {
658660
let path = format!("{}/{}", path, table);
659661
let format = ParquetFormat::default().with_enable_pruning(true);
660662

661-
(Arc::new(format), path, ".parquet")
663+
(Arc::new(format), path, DEFAULT_PARQUET_EXTENSION)
662664
}
663665
other => {
664666
unimplemented!("Invalid file format '{}'", other);

datafusion-examples/examples/parquet_sql_multiple_files.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use datafusion::datasource::file_format::parquet::ParquetFormat;
18+
use datafusion::datasource::file_format::parquet::{DEFAULT_PARQUET_EXTENSION, ParquetFormat};
1919
use datafusion::datasource::listing::ListingOptions;
2020
use datafusion::error::Result;
2121
use datafusion::prelude::*;
@@ -33,7 +33,7 @@ async fn main() -> Result<()> {
3333
// Configure listing options
3434
let file_format = ParquetFormat::default().with_enable_pruning(true);
3535
let listing_options = ListingOptions {
36-
file_extension: ".parquet".to_owned(),
36+
file_extension: DEFAULT_PARQUET_EXTENSION.to_owned(),
3737
format: Arc::new(file_format),
3838
table_partition_cols: vec![],
3939
collect_stat: true,

datafusion/src/datasource/file_format/avro.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ use crate::physical_plan::file_format::{AvroExec, FileScanConfig};
3434
use crate::physical_plan::ExecutionPlan;
3535
use crate::physical_plan::Statistics;
3636

37+
/// The default file extension of avro files
38+
pub const DEFAULT_AVRO_EXTENSION: &str = ".avro";
3739
/// Avro `FileFormat` implementation.
3840
#[derive(Default, Debug)]
3941
pub struct AvroFormat;

datafusion/src/datasource/file_format/csv.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ use crate::physical_plan::file_format::{CsvExec, FileScanConfig};
3333
use crate::physical_plan::ExecutionPlan;
3434
use crate::physical_plan::Statistics;
3535

36+
/// The default file extension of csv files
37+
pub const DEFAULT_CSV_EXTENSION: &str = ".csv";
3638
/// Character Separated Value `FileFormat` implementation.
3739
#[derive(Debug)]
3840
pub struct CsvFormat {

datafusion/src/datasource/file_format/json.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ use crate::physical_plan::file_format::NdJsonExec;
3737
use crate::physical_plan::ExecutionPlan;
3838
use crate::physical_plan::Statistics;
3939

40+
/// The default file extension of json files
41+
pub const DEFAULT_JSON_EXTENSION: &str = ".json";
4042
/// New line delimited JSON `FileFormat` implementation.
4143
#[derive(Debug, Default)]
4244
pub struct JsonFormat {

datafusion/src/datasource/listing/table.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,8 @@ mod tests {
274274
logical_plan::{col, lit},
275275
test::{columns, object_store::TestObjectStore},
276276
};
277+
use crate::datasource::file_format::avro::DEFAULT_AVRO_EXTENSION;
278+
use crate::datasource::file_format::parquet::DEFAULT_PARQUET_EXTENSION;
277279

278280
use super::*;
279281

@@ -318,7 +320,7 @@ mod tests {
318320
let store = TestObjectStore::new_arc(&[("table/p1=v1/file.avro", 100)]);
319321

320322
let opt = ListingOptions {
321-
file_extension: ".avro".to_owned(),
323+
file_extension: DEFAULT_AVRO_EXTENSION.to_owned(),
322324
format: Arc::new(AvroFormat {}),
323325
table_partition_cols: vec![String::from("p1")],
324326
target_partitions: 4,
@@ -419,7 +421,7 @@ mod tests {
419421
let testdata = crate::test_util::parquet_test_data();
420422
let filename = format!("{}/{}", testdata, name);
421423
let opt = ListingOptions {
422-
file_extension: "parquet".to_owned(),
424+
file_extension: DEFAULT_PARQUET_EXTENSION.to_owned(),
423425
format: Arc::new(ParquetFormat::default()),
424426
table_partition_cols: vec![],
425427
target_partitions: 2,

datafusion/src/execution/context.rs

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ use crate::{
2424
datasource::listing::{ListingOptions, ListingTable},
2525
datasource::{
2626
file_format::{
27-
avro::AvroFormat,
28-
csv::CsvFormat,
27+
avro::{AvroFormat, DEFAULT_AVRO_EXTENSION},
28+
csv::{CsvFormat, DEFAULT_CSV_EXTENSION},
2929
parquet::{ParquetFormat, DEFAULT_PARQUET_EXTENSION},
3030
FileFormat,
3131
},
@@ -209,17 +209,20 @@ impl ExecutionContext {
209209
ref file_type,
210210
ref has_header,
211211
}) => {
212-
let file_format = match file_type {
213-
FileType::CSV => {
214-
Ok(Arc::new(CsvFormat::default().with_has_header(*has_header))
215-
as Arc<dyn FileFormat>)
216-
}
217-
FileType::Parquet => {
218-
Ok(Arc::new(ParquetFormat::default()) as Arc<dyn FileFormat>)
219-
}
220-
FileType::Avro => {
221-
Ok(Arc::new(AvroFormat::default()) as Arc<dyn FileFormat>)
222-
}
212+
let (file_format, file_extension) = match file_type {
213+
FileType::CSV => Ok((
214+
Arc::new(CsvFormat::default().with_has_header(*has_header))
215+
as Arc<dyn FileFormat>,
216+
DEFAULT_CSV_EXTENSION,
217+
)),
218+
FileType::Parquet => Ok((
219+
Arc::new(ParquetFormat::default()) as Arc<dyn FileFormat>,
220+
DEFAULT_PARQUET_EXTENSION,
221+
)),
222+
FileType::Avro => Ok((
223+
Arc::new(AvroFormat::default()) as Arc<dyn FileFormat>,
224+
DEFAULT_AVRO_EXTENSION,
225+
)),
223226
_ => Err(DataFusionError::NotImplemented(format!(
224227
"Unsupported file type {:?}.",
225228
file_type
@@ -229,7 +232,7 @@ impl ExecutionContext {
229232
let options = ListingOptions {
230233
format: file_format,
231234
collect_stat: false,
232-
file_extension: String::new(),
235+
file_extension: file_extension.to_owned(),
233236
target_partitions: self
234237
.state
235238
.lock()

datafusion/src/execution/options.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use crate::datasource::{
2525
file_format::{avro::AvroFormat, csv::CsvFormat},
2626
listing::ListingOptions,
2727
};
28+
use crate::datasource::file_format::json::DEFAULT_JSON_EXTENSION;
2829

2930
/// CSV file read option
3031
#[derive(Copy, Clone)]
@@ -173,7 +174,7 @@ impl<'a> Default for NdJsonReadOptions<'a> {
173174
Self {
174175
schema: None,
175176
schema_infer_max_records: 1000,
176-
file_extension: ".json",
177+
file_extension: DEFAULT_JSON_EXTENSION,
177178
}
178179
}
179180
}

0 commit comments

Comments
 (0)