Skip to content

Commit 9d81601

Browse files
committed
Fix failing tests
1 parent d64de7e commit 9d81601

File tree

9 files changed

+49
-136
lines changed

9 files changed

+49
-136
lines changed

datafusion/common/src/config.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1566,7 +1566,7 @@ config_namespace! {
15661566
pub struct CsvOptions {
15671567
/// Specifies whether there is a CSV header (i.e. the first line
15681568
/// consists of is column names). If not specified, uses default from
1569-
/// the `CREATE TABLE` command, if any.
1569+
/// the session state, if any.
15701570
pub has_header: Option<bool>, default = None
15711571
pub delimiter: u8, default = b','
15721572
pub quote: u8, default = b'"'
@@ -1609,8 +1609,8 @@ impl CsvOptions {
16091609

16101610
/// Returns true if the first line is a header. If format options does not
16111611
/// specify whether there is a header, consults the configuration.
1612-
pub fn has_header(&self, config_opt: &ConfigOptions) -> bool {
1613-
self.has_header.unwrap_or(config_opt.catalog.has_header)
1612+
pub fn has_header(&self) -> Option<bool> {
1613+
self.has_header
16141614
}
16151615

16161616
/// The character separating values within a row.

datafusion/core/src/datasource/file_format/csv.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ use arrow::array::RecordBatch;
4040
use arrow::csv::WriterBuilder;
4141
use arrow::datatypes::SchemaRef;
4242
use arrow::datatypes::{DataType, Field, Fields, Schema};
43-
use datafusion_common::config::{ConfigOptions, CsvOptions};
43+
use datafusion_common::config::CsvOptions;
4444
use datafusion_common::file_options::csv_writer::CsvWriterOptions;
4545
use datafusion_common::{exec_err, not_impl_err, DataFusionError, FileType};
4646
use datafusion_execution::TaskContext;
@@ -142,8 +142,8 @@ impl CsvFormat {
142142
}
143143

144144
/// True if the first line is a header.
145-
pub fn has_header(&self, config_opt: &ConfigOptions) -> bool {
146-
self.options.has_header(config_opt)
145+
pub fn has_header(&self) -> Option<bool> {
146+
self.options.has_header
147147
}
148148

149149
/// The character separating values within a row.
@@ -245,7 +245,9 @@ impl FileFormat for CsvFormat {
245245
conf,
246246
// If format options does not specify whether there is a header,
247247
// we consult configuration options.
248-
self.options.has_header(state.config_options()),
248+
self.options
249+
.has_header
250+
.unwrap_or(state.config_options().catalog.has_header),
249251
self.options.delimiter,
250252
self.options.quote,
251253
self.options.escape,
@@ -303,7 +305,10 @@ impl CsvFormat {
303305
while let Some(chunk) = stream.next().await.transpose()? {
304306
let format = arrow::csv::reader::Format::default()
305307
.with_header(
306-
self.options.has_header(state.config_options()) && first_chunk,
308+
self.options
309+
.has_header
310+
.unwrap_or(state.config_options().catalog.has_header)
311+
&& first_chunk,
307312
)
308313
.with_delimiter(self.options.delimiter);
309314

datafusion/core/src/datasource/stream.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use arrow_schema::SchemaRef;
3030
use async_trait::async_trait;
3131
use futures::StreamExt;
3232

33-
use datafusion_common::{plan_err, Constraints, DataFusionError, Result};
33+
use datafusion_common::{config_err, plan_err, Constraints, DataFusionError, Result};
3434
use datafusion_common_runtime::SpawnedTask;
3535
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
3636
use datafusion_expr::{CreateExternalTable, Expr, TableType};
@@ -58,11 +58,22 @@ impl TableProviderFactory for StreamTableFactory {
5858
let schema: SchemaRef = Arc::new(cmd.schema.as_ref().into());
5959
let location = cmd.location.clone();
6060
let encoding = cmd.file_type.parse()?;
61+
let header = if let Ok(opt) = cmd
62+
.options
63+
.get("format.has_header")
64+
.map(|has_header| bool::from_str(has_header))
65+
.transpose()
66+
{
67+
opt.unwrap_or(false)
68+
} else {
69+
return config_err!("format.has_header can either be true or false");
70+
};
6171

6272
let config = StreamConfig::new_file(schema, location.into())
6373
.with_encoding(encoding)
6474
.with_order(cmd.order_exprs.clone())
6575
.with_batch_size(state.config().batch_size())
76+
.with_header(header)
6677
.with_constraints(cmd.constraints.clone());
6778

6879
Ok(Arc::new(StreamTable(Arc::new(config))))

datafusion/core/tests/sql/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ async fn register_aggregate_csv_by_sql(ctx: &SessionContext) {
8585
c13 VARCHAR NOT NULL
8686
)
8787
STORED AS CSV
88-
LOCATION '{testdata}/csv/aggregate_test_100.csv
88+
LOCATION '{testdata}/csv/aggregate_test_100.csv'
8989
OPTIONS ('format.has_header' 'true')
9090
"
9191
))

datafusion/proto/tests/cases/roundtrip_logical_plan.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,7 @@ async fn roundtrip_custom_listing_tables() -> Result<()> {
238238
STORED AS CSV
239239
WITH ORDER (a ASC, b ASC)
240240
WITH ORDER (c ASC)
241-
LOCATION '../core/tests/data/window_2.csv';
241+
LOCATION '../core/tests/data/window_2.csv'
242242
OPTIONS ('format.has_header' 'true')";
243243

244244
let plan = ctx.state().create_logical_plan(query).await?;
@@ -268,7 +268,7 @@ async fn roundtrip_logical_plan_aggregation_with_pk() -> Result<()> {
268268
STORED AS CSV
269269
WITH ORDER (a ASC, b ASC)
270270
WITH ORDER (c ASC)
271-
LOCATION '../core/tests/data/window_2.csv';
271+
LOCATION '../core/tests/data/window_2.csv'
272272
OPTIONS ('format.has_header' 'true')",
273273
)
274274
.await?;

datafusion/sql/src/parser.rs

Lines changed: 20 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -101,12 +101,10 @@ pub struct CopyToStatement {
101101
pub target: String,
102102
/// Partition keys
103103
pub partitioned_by: Vec<String>,
104-
/// Indicates whether there is a header row (e.g. CSV)
105-
pub has_header: bool,
106104
/// File type (Parquet, NDJSON, CSV etc.)
107105
pub stored_as: Option<String>,
108106
/// Target specific options
109-
pub options: Vec<(String, Value)>,
107+
pub options: HashMap<String, String>,
110108
}
111109

112110
impl fmt::Display for CopyToStatement {
@@ -129,7 +127,10 @@ impl fmt::Display for CopyToStatement {
129127
}
130128

131129
if !options.is_empty() {
132-
let opts: Vec<_> = options.iter().map(|(k, v)| format!("{k} {v}")).collect();
130+
let opts: Vec<_> = options
131+
.iter()
132+
.map(|(k, v)| format!("'{k}' '{v}'"))
133+
.collect();
133134
write!(f, " OPTIONS ({})", opts.join(", "))?;
134135
}
135136

@@ -386,8 +387,7 @@ impl<'a> DFParser<'a> {
386387
stored_as: Option<String>,
387388
target: Option<String>,
388389
partitioned_by: Option<Vec<String>>,
389-
has_header: Option<bool>,
390-
options: Option<Vec<(String, Value)>>,
390+
options: Option<HashMap<String, String>>,
391391
}
392392

393393
let mut builder = Builder::default();
@@ -423,7 +423,7 @@ impl<'a> DFParser<'a> {
423423
}
424424
Keyword::OPTIONS => {
425425
ensure_not_set(&builder.options, "OPTIONS")?;
426-
builder.options = Some(self.parse_value_options()?);
426+
builder.options = Some(self.parse_string_options()?);
427427
}
428428
_ => {
429429
unreachable!()
@@ -451,9 +451,8 @@ impl<'a> DFParser<'a> {
451451
source,
452452
target,
453453
partitioned_by: builder.partitioned_by.unwrap_or(vec![]),
454-
has_header: builder.has_header.unwrap_or(false),
455454
stored_as: builder.stored_as,
456-
options: builder.options.unwrap_or(vec![]),
455+
options: builder.options.unwrap_or(HashMap::new()),
457456
}))
458457
}
459458

@@ -835,33 +834,6 @@ impl<'a> DFParser<'a> {
835834
}
836835
Ok(options)
837836
}
838-
839-
/// Parses (key value) style options into a map of String --> [`Value`].
840-
///
841-
/// Unlike [`Self::parse_string_options`], this method supports
842-
/// keywords as key names as well as multiple value types such as
843-
/// Numbers as well as Strings.
844-
fn parse_value_options(&mut self) -> Result<Vec<(String, Value)>, ParserError> {
845-
let mut options = vec![];
846-
self.parser.expect_token(&Token::LParen)?;
847-
848-
loop {
849-
let key = self.parse_option_key()?;
850-
let value = self.parse_option_value()?;
851-
options.push((key, value));
852-
let comma = self.parser.consume_token(&Token::Comma);
853-
if self.parser.consume_token(&Token::RParen) {
854-
// allow a trailing comma, even though it's not in standard
855-
break;
856-
} else if !comma {
857-
return self.expected(
858-
"',' or ')' after option definition",
859-
self.parser.peek_token(),
860-
);
861-
}
862-
}
863-
Ok(options)
864-
}
865837
}
866838

867839
#[cfg(test)]
@@ -997,27 +969,6 @@ mod tests {
997969
});
998970
expect_parse_ok(sql, expected)?;
999971

1000-
// positive case: it is ok for case insensitive sql stmt with has_header option tokens
1001-
let sqls = vec![
1002-
"CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV LOCATION 'foo.csv' OPTIONS ('FORMAT.HAS_HEADER' 'TRUE')",
1003-
"CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV LOCATION 'foo.csv' OPTIONS ('format.has_header' 'true')",
1004-
];
1005-
for sql in sqls {
1006-
let expected = Statement::CreateExternalTable(CreateExternalTable {
1007-
name: "t".into(),
1008-
columns: vec![make_column_def("c1", DataType::Int(display))],
1009-
file_type: "CSV".to_string(),
1010-
location: "foo.csv".into(),
1011-
table_partition_cols: vec![],
1012-
order_exprs: vec![],
1013-
if_not_exists: false,
1014-
unbounded: false,
1015-
options: HashMap::from([("format.has_header".into(), "true".into())]),
1016-
constraints: vec![],
1017-
});
1018-
expect_parse_ok(sql, expected)?;
1019-
}
1020-
1021972
// positive case: it is ok for sql stmt with `COMPRESSION TYPE GZIP` tokens
1022973
let sqls = vec![
1023974
("CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV LOCATION 'foo.csv' OPTIONS
@@ -1357,9 +1308,8 @@ mod tests {
13571308
source: object_name("foo"),
13581309
target: "bar".to_string(),
13591310
partitioned_by: vec![],
1360-
has_header: false,
13611311
stored_as: Some("CSV".to_owned()),
1362-
options: vec![],
1312+
options: HashMap::new(),
13631313
});
13641314

13651315
assert_eq!(verified_stmt(sql), expected);
@@ -1393,9 +1343,8 @@ mod tests {
13931343
source: object_name("foo"),
13941344
target: "bar".to_string(),
13951345
partitioned_by: vec![],
1396-
has_header: false,
13971346
stored_as: Some("PARQUET".to_owned()),
1398-
options: vec![],
1347+
options: HashMap::new(),
13991348
});
14001349
let expected = Statement::Explain(ExplainStatement {
14011350
analyze,
@@ -1430,9 +1379,8 @@ mod tests {
14301379
source: CopyToSource::Query(query),
14311380
target: "bar".to_string(),
14321381
partitioned_by: vec![],
1433-
has_header: true,
14341382
stored_as: Some("CSV".to_owned()),
1435-
options: vec![],
1383+
options: HashMap::from([("format.has_header".into(), "true".into())]),
14361384
});
14371385
assert_eq!(verified_stmt(sql), expected);
14381386
Ok(())
@@ -1445,30 +1393,22 @@ mod tests {
14451393
source: object_name("foo"),
14461394
target: "bar".to_string(),
14471395
partitioned_by: vec![],
1448-
has_header: false,
14491396
stored_as: Some("CSV".to_owned()),
1450-
options: vec![(
1451-
"row_group_size".to_string(),
1452-
Value::Number("55".to_string(), false),
1453-
)],
1397+
options: HashMap::from([("row_group_size".into(), "55".into())]),
14541398
});
14551399
assert_eq!(verified_stmt(sql), expected);
14561400
Ok(())
14571401
}
14581402

14591403
#[test]
14601404
fn copy_to_partitioned_by() -> Result<(), ParserError> {
1461-
let sql = "COPY foo TO bar STORED AS CSV PARTITIONED BY (a) OPTIONS (row_group_size 55)";
1405+
let sql = "COPY foo TO bar STORED AS CSV PARTITIONED BY (a) OPTIONS ('row_group_size' '55')";
14621406
let expected = Statement::CopyTo(CopyToStatement {
14631407
source: object_name("foo"),
14641408
target: "bar".to_string(),
14651409
partitioned_by: vec!["a".to_string()],
1466-
has_header: false,
14671410
stored_as: Some("CSV".to_owned()),
1468-
options: vec![(
1469-
"row_group_size".to_string(),
1470-
Value::Number("55".to_string(), false),
1471-
)],
1411+
options: HashMap::from([("row_group_size".to_string(), "55".into())]),
14721412
});
14731413
assert_eq!(verified_stmt(sql), expected);
14741414
Ok(())
@@ -1478,18 +1418,12 @@ mod tests {
14781418
fn copy_to_multi_options() -> Result<(), ParserError> {
14791419
// order of options is preserved
14801420
let sql =
1481-
"COPY foo TO bar STORED AS parquet OPTIONS ('format.row_group_size' 55, 'format.compression' snappy)";
1421+
"COPY foo TO bar STORED AS parquet OPTIONS ('format.row_group_size' '55', 'format.compression' 'snappy')";
14821422

1483-
let expected_options = vec![
1484-
(
1485-
"format.row_group_size".to_string(),
1486-
Value::Number("55".to_string(), false),
1487-
),
1488-
(
1489-
"format.compression".to_string(),
1490-
Value::UnQuotedString("snappy".to_string()),
1491-
),
1492-
];
1423+
let expected_options = HashMap::from([
1424+
("format.row_group_size".to_string(), "55".into()),
1425+
("format.compression".to_string(), "snappy".into()),
1426+
]);
14931427

14941428
let mut statements = DFParser::parse_sql(sql).unwrap();
14951429
assert_eq!(statements.len(), 1);
@@ -1527,6 +1461,7 @@ mod tests {
15271461
/// `canonical` sql string
15281462
fn one_statement_parses_to(sql: &str, canonical: &str) -> Statement {
15291463
let mut statements = DFParser::parse_sql(sql).unwrap();
1464+
println!("{:?}", statements[0]);
15301465
assert_eq!(statements.len(), 1);
15311466

15321467
if sql != canonical {

datafusion/sql/src/statement.rs

Lines changed: 1 addition & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -849,36 +849,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
849849
}
850850
};
851851

852-
let mut options = HashMap::new();
853-
for (key, value) in statement.options {
854-
let value_string = match value {
855-
Value::SingleQuotedString(s) => s.to_string(),
856-
Value::DollarQuotedString(s) => s.to_string(),
857-
Value::UnQuotedString(s) => s.to_string(),
858-
Value::Number(_, _) | Value::Boolean(_) => value.to_string(),
859-
Value::DoubleQuotedString(_)
860-
| Value::EscapedStringLiteral(_)
861-
| Value::NationalStringLiteral(_)
862-
| Value::SingleQuotedByteStringLiteral(_)
863-
| Value::DoubleQuotedByteStringLiteral(_)
864-
| Value::RawStringLiteral(_)
865-
| Value::HexStringLiteral(_)
866-
| Value::Null
867-
| Value::Placeholder(_) => {
868-
return plan_err!("Unsupported Value in COPY statement {}", value);
869-
}
870-
};
871-
if !(&key.contains('.')) {
872-
// If config does not belong to any namespace, assume it is
873-
// a format option and apply the format prefix for backwards
874-
// compatibility.
875-
876-
let renamed_key = format!("format.{}", key);
877-
options.insert(renamed_key.to_lowercase(), value_string.to_lowercase());
878-
} else {
879-
options.insert(key.to_lowercase(), value_string.to_lowercase());
880-
}
881-
}
852+
let options = statement.options;
882853

883854
let file_type = if let Some(file_type) = statement.stored_as {
884855
FileType::from_str(&file_type).map_err(|_| {

datafusion/sqllogictest/test_files/create_external_table.slt

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -52,14 +52,6 @@ CREATE EXTERNAL TABLE t STORED AS CSV WITH HEADER LOCATION 'abc'
5252
statement error DataFusion error: SQL error: ParserError\("Expected BY, found: LOCATION"\)
5353
CREATE EXTERNAL TABLE t STORED AS CSV PARTITIONED LOCATION 'abc'
5454

55-
# Missing `TYPE` in COMPRESSION clause
56-
statement error DataFusion error: SQL error: ParserError\("Expected TYPE, found: LOCATION"\)
57-
CREATE EXTERNAL TABLE t STORED AS CSV COMPRESSION LOCATION 'abc'
58-
59-
# Invalid compression type
60-
statement error DataFusion error: SQL error: ParserError\("Unsupported file compression type ZZZ"\)
61-
CREATE EXTERNAL TABLE t STORED AS CSV COMPRESSION TYPE ZZZ LOCATION 'blahblah'
62-
6355
# Duplicate `STORED AS` clause
6456
statement error DataFusion error: SQL error: ParserError\("STORED AS specified more than once"\)
6557
CREATE EXTERNAL TABLE t STORED AS CSV STORED AS PARQUET LOCATION 'foo.parquet'

datafusion/sqllogictest/test_files/repartition_scan.slt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,6 @@ statement ok
278278
CREATE EXTERNAL TABLE avro_table
279279
STORED AS AVRO
280280
LOCATION '../../testing/data/avro/simple_enum.avro'
281-
OPTIONS ('format.has_header' 'true');
282281

283282

284283
# It would be great to see the file read as "4" groups with even sizes (offsets) eventually

0 commit comments

Comments
 (0)