Skip to content

Commit afa37f8

Browse files
committed
Merge remote-tracking branch 'apache/main' into alamb/sql_frontend
2 parents c6944ce + ca9c322 commit afa37f8

File tree

56 files changed

+1762
-691
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

56 files changed

+1762
-691
lines changed

datafusion-cli/Cargo.lock

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion-cli/src/exec.rs

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ use std::collections::HashMap;
2121
use std::fs::File;
2222
use std::io::prelude::*;
2323
use std::io::BufReader;
24-
use std::str::FromStr;
2524

2625
use crate::cli_context::CliSessionContext;
2726
use crate::helper::split_from_semicolon;
@@ -35,14 +34,14 @@ use crate::{
3534

3635
use datafusion::common::instant::Instant;
3736
use datafusion::common::plan_datafusion_err;
37+
use datafusion::config::ConfigFileType;
3838
use datafusion::datasource::listing::ListingTableUrl;
3939
use datafusion::error::{DataFusionError, Result};
4040
use datafusion::logical_expr::{DdlStatement, LogicalPlan};
4141
use datafusion::physical_plan::{collect, execute_stream, ExecutionPlanProperties};
4242
use datafusion::sql::parser::{DFParser, Statement};
4343
use datafusion::sql::sqlparser::dialect::dialect_from_str;
4444

45-
use datafusion::common::FileType;
4645
use datafusion::sql::sqlparser;
4746
use rustyline::error::ReadlineError;
4847
use rustyline::Editor;
@@ -291,6 +290,15 @@ impl AdjustedPrintOptions {
291290
}
292291
}
293292

293+
fn config_file_type_from_str(ext: &str) -> Option<ConfigFileType> {
294+
match ext.to_lowercase().as_str() {
295+
"csv" => Some(ConfigFileType::CSV),
296+
"json" => Some(ConfigFileType::JSON),
297+
"parquet" => Some(ConfigFileType::PARQUET),
298+
_ => None,
299+
}
300+
}
301+
294302
async fn create_plan(
295303
ctx: &mut dyn CliSessionContext,
296304
statement: Statement,
@@ -302,7 +310,7 @@ async fn create_plan(
302310
// will raise Configuration errors.
303311
if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &plan {
304312
// To support custom formats, treat error as None
305-
let format = FileType::from_str(&cmd.file_type).ok();
313+
let format = config_file_type_from_str(&cmd.file_type);
306314
register_object_store_and_config_extensions(
307315
ctx,
308316
&cmd.location,
@@ -313,13 +321,13 @@ async fn create_plan(
313321
}
314322

315323
if let LogicalPlan::Copy(copy_to) = &mut plan {
316-
let format: FileType = (&copy_to.format_options).into();
324+
let format = config_file_type_from_str(&copy_to.file_type.get_ext());
317325

318326
register_object_store_and_config_extensions(
319327
ctx,
320328
&copy_to.output_url,
321329
&copy_to.options,
322-
Some(format),
330+
format,
323331
)
324332
.await?;
325333
}
@@ -357,7 +365,7 @@ pub(crate) async fn register_object_store_and_config_extensions(
357365
ctx: &dyn CliSessionContext,
358366
location: &String,
359367
options: &HashMap<String, String>,
360-
format: Option<FileType>,
368+
format: Option<ConfigFileType>,
361369
) -> Result<()> {
362370
// Parse the location URL to extract the scheme and other components
363371
let table_path = ListingTableUrl::parse(location)?;
@@ -374,7 +382,7 @@ pub(crate) async fn register_object_store_and_config_extensions(
374382
// Clone and modify the default table options based on the provided options
375383
let mut table_options = ctx.session_state().default_table_options().clone();
376384
if let Some(format) = format {
377-
table_options.set_file_format(format);
385+
table_options.set_config_format(format);
378386
}
379387
table_options.alter_with_string_hash_map(options)?;
380388

@@ -392,7 +400,6 @@ pub(crate) async fn register_object_store_and_config_extensions(
392400
mod tests {
393401
use super::*;
394402

395-
use datafusion::common::config::FormatOptions;
396403
use datafusion::common::plan_err;
397404

398405
use datafusion::prelude::SessionContext;
@@ -403,7 +410,7 @@ mod tests {
403410
let plan = ctx.state().create_logical_plan(sql).await?;
404411

405412
if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &plan {
406-
let format = FileType::from_str(&cmd.file_type).ok();
413+
let format = config_file_type_from_str(&cmd.file_type);
407414
register_object_store_and_config_extensions(
408415
&ctx,
409416
&cmd.location,
@@ -429,12 +436,12 @@ mod tests {
429436
let plan = ctx.state().create_logical_plan(sql).await?;
430437

431438
if let LogicalPlan::Copy(cmd) = &plan {
432-
let format: FileType = (&cmd.format_options).into();
439+
let format = config_file_type_from_str(&cmd.file_type.get_ext());
433440
register_object_store_and_config_extensions(
434441
&ctx,
435442
&cmd.output_url,
436443
&cmd.options,
437-
Some(format),
444+
format,
438445
)
439446
.await?;
440447
} else {
@@ -484,7 +491,7 @@ mod tests {
484491
let mut plan = create_plan(&mut ctx, statement).await?;
485492
if let LogicalPlan::Copy(copy_to) = &mut plan {
486493
assert_eq!(copy_to.output_url, location);
487-
assert!(matches!(copy_to.format_options, FormatOptions::PARQUET(_)));
494+
assert_eq!(copy_to.file_type.get_ext(), "parquet".to_string());
488495
ctx.runtime_env()
489496
.object_store_registry
490497
.get_store(&Url::parse(&copy_to.output_url).unwrap())?;

datafusion-examples/examples/external_dependency/dataframe-to-s3.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,10 @@ use std::sync::Arc;
2020

2121
use datafusion::dataframe::DataFrameWriteOptions;
2222
use datafusion::datasource::file_format::parquet::ParquetFormat;
23+
use datafusion::datasource::file_format::FileFormat;
2324
use datafusion::datasource::listing::ListingOptions;
2425
use datafusion::error::Result;
2526
use datafusion::prelude::*;
26-
use datafusion_common::{FileType, GetExt};
2727

2828
use object_store::aws::AmazonS3Builder;
2929
use url::Url;
@@ -54,7 +54,7 @@ async fn main() -> Result<()> {
5454
let path = format!("s3://{bucket_name}/test_data/");
5555
let file_format = ParquetFormat::default().with_enable_pruning(true);
5656
let listing_options = ListingOptions::new(Arc::new(file_format))
57-
.with_file_extension(FileType::PARQUET.get_ext());
57+
.with_file_extension(ParquetFormat::default().get_ext());
5858
ctx.register_listing_table("test", &path, listing_options, None, None)
5959
.await?;
6060

@@ -79,7 +79,7 @@ async fn main() -> Result<()> {
7979

8080
let file_format = ParquetFormat::default().with_enable_pruning(true);
8181
let listing_options = ListingOptions::new(Arc::new(file_format))
82-
.with_file_extension(FileType::PARQUET.get_ext());
82+
.with_file_extension(ParquetFormat::default().get_ext());
8383
ctx.register_listing_table("test2", &out_path, listing_options, None, None)
8484
.await?;
8585

datafusion/common/src/config.rs

Lines changed: 42 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use std::str::FromStr;
2424

2525
use crate::error::_config_err;
2626
use crate::parsers::CompressionTypeVariant;
27-
use crate::{DataFusionError, FileType, Result};
27+
use crate::{DataFusionError, Result};
2828

2929
/// A macro that wraps a configuration struct and automatically derives
3030
/// [`Default`] and [`ConfigField`] for it, allowing it to be used
@@ -204,6 +204,11 @@ config_namespace! {
204204
/// MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, MsSQL, ClickHouse, BigQuery, and Ansi.
205205
pub dialect: String, default = "generic".to_string()
206206

207+
/// If true, permit lengths for `VARCHAR` such as `VARCHAR(20)`, but
208+
/// ignore the length. If false, error if a `VARCHAR` with a length is
209+
/// specified. The Arrow type system does not have a notion of maximum
210+
/// string length and thus DataFusion can not enforce such limits.
211+
pub support_varchar_with_length: bool, default = true
207212
}
208213
}
209214

@@ -303,6 +308,7 @@ config_namespace! {
303308
/// statistics into the same file groups.
304309
/// Currently experimental
305310
pub split_file_groups_by_statistics: bool, default = false
311+
306312
}
307313
}
308314

@@ -1116,6 +1122,16 @@ macro_rules! extensions_options {
11161122
}
11171123
}
11181124

1125+
/// These file types have special built in behavior for configuration.
1126+
/// Use TableOptions::Extensions for configuring other file types.
1127+
#[derive(Debug, Clone)]
1128+
pub enum ConfigFileType {
1129+
CSV,
1130+
#[cfg(feature = "parquet")]
1131+
PARQUET,
1132+
JSON,
1133+
}
1134+
11191135
/// Represents the configuration options available for handling different table formats within a data processing application.
11201136
/// This struct encompasses options for various file formats including CSV, Parquet, and JSON, allowing for flexible configuration
11211137
/// of parsing and writing behaviors specific to each format. Additionally, it supports extending functionality through custom extensions.
@@ -1134,7 +1150,7 @@ pub struct TableOptions {
11341150

11351151
/// The current file format that the table operations should assume. This option allows
11361152
/// for dynamic switching between the supported file types (e.g., CSV, Parquet, JSON).
1137-
pub current_format: Option<FileType>,
1153+
pub current_format: Option<ConfigFileType>,
11381154

11391155
/// Optional extensions that can be used to extend or customize the behavior of the table
11401156
/// options. Extensions can be registered using `Extensions::insert` and might include
@@ -1152,10 +1168,9 @@ impl ConfigField for TableOptions {
11521168
if let Some(file_type) = &self.current_format {
11531169
match file_type {
11541170
#[cfg(feature = "parquet")]
1155-
FileType::PARQUET => self.parquet.visit(v, "format", ""),
1156-
FileType::CSV => self.csv.visit(v, "format", ""),
1157-
FileType::JSON => self.json.visit(v, "format", ""),
1158-
_ => {}
1171+
ConfigFileType::PARQUET => self.parquet.visit(v, "format", ""),
1172+
ConfigFileType::CSV => self.csv.visit(v, "format", ""),
1173+
ConfigFileType::JSON => self.json.visit(v, "format", ""),
11591174
}
11601175
} else {
11611176
self.csv.visit(v, "csv", "");
@@ -1188,12 +1203,9 @@ impl ConfigField for TableOptions {
11881203
match key {
11891204
"format" => match format {
11901205
#[cfg(feature = "parquet")]
1191-
FileType::PARQUET => self.parquet.set(rem, value),
1192-
FileType::CSV => self.csv.set(rem, value),
1193-
FileType::JSON => self.json.set(rem, value),
1194-
_ => {
1195-
_config_err!("Config value \"{key}\" is not supported on {}", format)
1196-
}
1206+
ConfigFileType::PARQUET => self.parquet.set(rem, value),
1207+
ConfigFileType::CSV => self.csv.set(rem, value),
1208+
ConfigFileType::JSON => self.json.set(rem, value),
11971209
},
11981210
_ => _config_err!("Config value \"{key}\" not found on TableOptions"),
11991211
}
@@ -1210,15 +1222,6 @@ impl TableOptions {
12101222
Self::default()
12111223
}
12121224

1213-
/// Sets the file format for the table.
1214-
///
1215-
/// # Parameters
1216-
///
1217-
/// * `format`: The file format to use (e.g., CSV, Parquet).
1218-
pub fn set_file_format(&mut self, format: FileType) {
1219-
self.current_format = Some(format);
1220-
}
1221-
12221225
/// Creates a new `TableOptions` instance initialized with settings from a given session config.
12231226
///
12241227
/// # Parameters
@@ -1249,6 +1252,15 @@ impl TableOptions {
12491252
clone
12501253
}
12511254

1255+
/// Sets the file format for the table.
1256+
///
1257+
/// # Parameters
1258+
///
1259+
/// * `format`: The file format to use (e.g., CSV, Parquet).
1260+
pub fn set_config_format(&mut self, format: ConfigFileType) {
1261+
self.current_format = Some(format);
1262+
}
1263+
12521264
/// Sets the extensions for this `TableOptions` instance.
12531265
///
12541266
/// # Parameters
@@ -1673,6 +1685,8 @@ config_namespace! {
16731685
}
16741686
}
16751687

1688+
pub trait FormatOptionsExt: Display {}
1689+
16761690
#[derive(Debug, Clone, PartialEq)]
16771691
#[allow(clippy::large_enum_variant)]
16781692
pub enum FormatOptions {
@@ -1698,28 +1712,15 @@ impl Display for FormatOptions {
16981712
}
16991713
}
17001714

1701-
impl From<FileType> for FormatOptions {
1702-
fn from(value: FileType) -> Self {
1703-
match value {
1704-
FileType::ARROW => FormatOptions::ARROW,
1705-
FileType::AVRO => FormatOptions::AVRO,
1706-
#[cfg(feature = "parquet")]
1707-
FileType::PARQUET => FormatOptions::PARQUET(TableParquetOptions::default()),
1708-
FileType::CSV => FormatOptions::CSV(CsvOptions::default()),
1709-
FileType::JSON => FormatOptions::JSON(JsonOptions::default()),
1710-
}
1711-
}
1712-
}
1713-
17141715
#[cfg(test)]
17151716
mod tests {
17161717
use std::any::Any;
17171718
use std::collections::HashMap;
17181719

17191720
use crate::config::{
1720-
ConfigEntry, ConfigExtension, ExtensionOptions, Extensions, TableOptions,
1721+
ConfigEntry, ConfigExtension, ConfigFileType, ExtensionOptions, Extensions,
1722+
TableOptions,
17211723
};
1722-
use crate::FileType;
17231724

17241725
#[derive(Default, Debug, Clone)]
17251726
pub struct TestExtensionConfig {
@@ -1777,7 +1778,7 @@ mod tests {
17771778
let mut extension = Extensions::new();
17781779
extension.insert(TestExtensionConfig::default());
17791780
let mut table_config = TableOptions::new().with_extensions(extension);
1780-
table_config.set_file_format(FileType::CSV);
1781+
table_config.set_config_format(ConfigFileType::CSV);
17811782
table_config.set("format.delimiter", ";").unwrap();
17821783
assert_eq!(table_config.csv.delimiter, b';');
17831784
table_config.set("test.bootstrap.servers", "asd").unwrap();
@@ -1794,7 +1795,7 @@ mod tests {
17941795
#[test]
17951796
fn csv_u8_table_options() {
17961797
let mut table_config = TableOptions::new();
1797-
table_config.set_file_format(FileType::CSV);
1798+
table_config.set_config_format(ConfigFileType::CSV);
17981799
table_config.set("format.delimiter", ";").unwrap();
17991800
assert_eq!(table_config.csv.delimiter as char, ';');
18001801
table_config.set("format.escape", "\"").unwrap();
@@ -1807,7 +1808,7 @@ mod tests {
18071808
#[test]
18081809
fn parquet_table_options() {
18091810
let mut table_config = TableOptions::new();
1810-
table_config.set_file_format(FileType::PARQUET);
1811+
table_config.set_config_format(ConfigFileType::PARQUET);
18111812
table_config
18121813
.set("format.bloom_filter_enabled::col1", "true")
18131814
.unwrap();
@@ -1821,7 +1822,7 @@ mod tests {
18211822
#[test]
18221823
fn parquet_table_options_config_entry() {
18231824
let mut table_config = TableOptions::new();
1824-
table_config.set_file_format(FileType::PARQUET);
1825+
table_config.set_config_format(ConfigFileType::PARQUET);
18251826
table_config
18261827
.set("format.bloom_filter_enabled::col1", "true")
18271828
.unwrap();
@@ -1835,7 +1836,7 @@ mod tests {
18351836
#[test]
18361837
fn parquet_table_options_config_metadata_entry() {
18371838
let mut table_config = TableOptions::new();
1838-
table_config.set_file_format(FileType::PARQUET);
1839+
table_config.set_config_format(ConfigFileType::PARQUET);
18391840
table_config.set("format.metadata::key1", "").unwrap();
18401841
table_config.set("format.metadata::key2", "value2").unwrap();
18411842
table_config

0 commit comments

Comments
 (0)