Skip to content

Commit 3b5387b

Browse files
author
Bert Vermeiren
committed
Fix: CopyTo logical plan outputs 1 column
1 parent 649a36f commit 3b5387b

File tree

10 files changed

+112
-59
lines changed

10 files changed

+112
-59
lines changed

datafusion/core/src/physical_planner.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -504,6 +504,7 @@ impl DefaultPhysicalPlanner {
504504
file_type,
505505
partition_by,
506506
options: source_option_tuples,
507+
output_schema: _,
507508
}) => {
508509
let original_url = output_url.clone();
509510
let input_exec = children.one()?;

datafusion/core/tests/dataframe/mod.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6137,3 +6137,29 @@ async fn test_dataframe_macro() -> Result<()> {
61376137

61386138
Ok(())
61396139
}
6140+
6141+
#[tokio::test]
6142+
async fn test_copy_schema() -> Result<()> {
6143+
let tmp_dir = TempDir::new()?;
6144+
6145+
let session_state = SessionStateBuilder::new_with_default_features().build();
6146+
6147+
let session_ctx = SessionContext::new_with_state(session_state);
6148+
6149+
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, true)]));
6150+
6151+
// Create and register the source table with the provided schema and data
6152+
let source_table = Arc::new(MemTable::try_new(schema.clone(), vec![vec![]])?);
6153+
session_ctx.register_table("source_table", source_table.clone())?;
6154+
6155+
let target_path = tmp_dir.path().join("target.csv");
6156+
6157+
let query = format!(
6158+
"COPY source_table TO '{:?}' STORED AS csv",
6159+
target_path.to_str().unwrap()
6160+
);
6161+
6162+
let result = session_ctx.sql(&query).await?;
6163+
assert_logical_expr_schema_eq_physical_expr_schema(result).await?;
6164+
Ok(())
6165+
}

datafusion/expr/src/logical_plan/builder.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -407,13 +407,13 @@ impl LogicalPlanBuilder {
407407
options: HashMap<String, String>,
408408
partition_by: Vec<String>,
409409
) -> Result<Self> {
410-
Ok(Self::new(LogicalPlan::Copy(CopyTo {
411-
input: Arc::new(input),
410+
Ok(Self::new(LogicalPlan::Copy(CopyTo::new(
411+
Arc::new(input),
412412
output_url,
413413
partition_by,
414414
file_type,
415415
options,
416-
})))
416+
))))
417417
}
418418

419419
/// Create a [`DmlStatement`] for inserting the contents of this builder into the named table.

datafusion/expr/src/logical_plan/display.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -426,6 +426,7 @@ impl<'a, 'b> PgJsonVisitor<'a, 'b> {
426426
file_type,
427427
partition_by: _,
428428
options,
429+
output_schema: _,
429430
}) => {
430431
let op_str = options
431432
.iter()

datafusion/expr/src/logical_plan/dml.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ pub struct CopyTo {
4040
pub file_type: Arc<dyn FileType>,
4141
/// SQL Options that can affect the formats
4242
pub options: HashMap<String, String>,
43+
pub output_schema: DFSchemaRef,
4344
}
4445

4546
impl Debug for CopyTo {
@@ -50,6 +51,7 @@ impl Debug for CopyTo {
5051
.field("partition_by", &self.partition_by)
5152
.field("file_type", &"...")
5253
.field("options", &self.options)
54+
.field("output_schema", &self.output_schema)
5355
.finish_non_exhaustive()
5456
}
5557
}
@@ -89,6 +91,26 @@ impl Hash for CopyTo {
8991
}
9092
}
9193

94+
impl CopyTo {
95+
pub fn new(
96+
input: Arc<LogicalPlan>,
97+
output_url: String,
98+
partition_by: Vec<String>,
99+
file_type: Arc<dyn FileType>,
100+
options: HashMap<String, String>,
101+
) -> Self {
102+
Self {
103+
input,
104+
output_url,
105+
partition_by,
106+
file_type,
107+
options,
108+
// The output schema is always a single column with the number of rows copied
109+
output_schema: make_count_schema(),
110+
}
111+
}
112+
}
113+
92114
/// Modifies the content of a database
93115
///
94116
/// This operator is used to perform DML operations such as INSERT, DELETE,

datafusion/expr/src/logical_plan/plan.rs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -344,7 +344,7 @@ impl LogicalPlan {
344344
output_schema
345345
}
346346
LogicalPlan::Dml(DmlStatement { output_schema, .. }) => output_schema,
347-
LogicalPlan::Copy(CopyTo { input, .. }) => input.schema(),
347+
LogicalPlan::Copy(CopyTo { output_schema, .. }) => output_schema,
348348
LogicalPlan::Ddl(ddl) => ddl.schema(),
349349
LogicalPlan::Unnest(Unnest { schema, .. }) => schema,
350350
LogicalPlan::RecursiveQuery(RecursiveQuery { static_term, .. }) => {
@@ -809,16 +809,17 @@ impl LogicalPlan {
809809
file_type,
810810
options,
811811
partition_by,
812+
output_schema: _,
812813
}) => {
813814
self.assert_no_expressions(expr)?;
814815
let input = self.only_input(inputs)?;
815-
Ok(LogicalPlan::Copy(CopyTo {
816-
input: Arc::new(input),
817-
output_url: output_url.clone(),
818-
file_type: Arc::clone(file_type),
819-
options: options.clone(),
820-
partition_by: partition_by.clone(),
821-
}))
816+
Ok(LogicalPlan::Copy(CopyTo::new(
817+
Arc::new(input),
818+
output_url.clone(),
819+
partition_by.clone(),
820+
Arc::clone(file_type),
821+
options.clone(),
822+
)))
822823
}
823824
LogicalPlan::Values(Values { schema, .. }) => {
824825
self.assert_no_inputs(inputs)?;

datafusion/expr/src/logical_plan/tree_node.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,13 +243,15 @@ impl TreeNode for LogicalPlan {
243243
partition_by,
244244
file_type,
245245
options,
246+
output_schema,
246247
}) => input.map_elements(f)?.update_data(|input| {
247248
LogicalPlan::Copy(CopyTo {
248249
input,
249250
output_url,
250251
partition_by,
251252
file_type,
252253
options,
254+
output_schema,
253255
})
254256
}),
255257
LogicalPlan::Ddl(ddl) => {

datafusion/proto/src/logical_plan/mod.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -905,13 +905,13 @@ impl AsLogicalPlan for LogicalPlanNode {
905905
extension_codec.try_decode_file_format(&copy.file_type, ctx)?,
906906
);
907907

908-
Ok(LogicalPlan::Copy(dml::CopyTo {
909-
input: Arc::new(input),
910-
output_url: copy.output_url.clone(),
911-
partition_by: copy.partition_by.clone(),
908+
Ok(LogicalPlan::Copy(dml::CopyTo::new(
909+
Arc::new(input),
910+
copy.output_url.clone(),
911+
copy.partition_by.clone(),
912912
file_type,
913-
options: Default::default(),
914-
}))
913+
Default::default(),
914+
)))
915915
}
916916
LogicalPlanType::Unnest(unnest) => {
917917
let input: LogicalPlan =

datafusion/proto/tests/cases/roundtrip_logical_plan.rs

Lines changed: 36 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -429,13 +429,13 @@ async fn roundtrip_logical_plan_copy_to_sql_options() -> Result<()> {
429429
let input = create_csv_scan(&ctx).await?;
430430
let file_type = format_as_file_type(Arc::new(CsvFormatFactory::new()));
431431

432-
let plan = LogicalPlan::Copy(CopyTo {
433-
input: Arc::new(input),
434-
output_url: "test.csv".to_string(),
435-
partition_by: vec!["a".to_string(), "b".to_string(), "c".to_string()],
432+
let plan = LogicalPlan::Copy(CopyTo::new(
433+
Arc::new(input),
434+
"test.csv".to_string(),
435+
vec!["a".to_string(), "b".to_string(), "c".to_string()],
436436
file_type,
437-
options: Default::default(),
438-
});
437+
Default::default(),
438+
));
439439

440440
let codec = CsvLogicalExtensionCodec {};
441441
let bytes = logical_plan_to_bytes_with_extension_codec(&plan, &codec)?;
@@ -469,13 +469,13 @@ async fn roundtrip_logical_plan_copy_to_writer_options() -> Result<()> {
469469
ParquetFormatFactory::new_with_options(parquet_format),
470470
));
471471

472-
let plan = LogicalPlan::Copy(CopyTo {
473-
input: Arc::new(input),
474-
output_url: "test.parquet".to_string(),
472+
let plan = LogicalPlan::Copy(CopyTo::new(
473+
Arc::new(input),
474+
"test.parquet".to_string(),
475+
vec!["a".to_string(), "b".to_string(), "c".to_string()],
475476
file_type,
476-
partition_by: vec!["a".to_string(), "b".to_string(), "c".to_string()],
477-
options: Default::default(),
478-
});
477+
Default::default(),
478+
));
479479

480480
let codec = ParquetLogicalExtensionCodec {};
481481
let bytes = logical_plan_to_bytes_with_extension_codec(&plan, &codec)?;
@@ -501,13 +501,13 @@ async fn roundtrip_logical_plan_copy_to_arrow() -> Result<()> {
501501

502502
let file_type = format_as_file_type(Arc::new(ArrowFormatFactory::new()));
503503

504-
let plan = LogicalPlan::Copy(CopyTo {
505-
input: Arc::new(input),
506-
output_url: "test.arrow".to_string(),
507-
partition_by: vec!["a".to_string(), "b".to_string(), "c".to_string()],
504+
let plan = LogicalPlan::Copy(CopyTo::new(
505+
Arc::new(input),
506+
"test.arrow".to_string(),
507+
vec!["a".to_string(), "b".to_string(), "c".to_string()],
508508
file_type,
509-
options: Default::default(),
510-
});
509+
Default::default(),
510+
));
511511

512512
let codec = ArrowLogicalExtensionCodec {};
513513
let bytes = logical_plan_to_bytes_with_extension_codec(&plan, &codec)?;
@@ -548,13 +548,13 @@ async fn roundtrip_logical_plan_copy_to_csv() -> Result<()> {
548548
csv_format.clone(),
549549
)));
550550

551-
let plan = LogicalPlan::Copy(CopyTo {
552-
input: Arc::new(input),
553-
output_url: "test.csv".to_string(),
554-
partition_by: vec!["a".to_string(), "b".to_string(), "c".to_string()],
551+
let plan = LogicalPlan::Copy(CopyTo::new(
552+
Arc::new(input),
553+
"test.csv".to_string(),
554+
vec!["a".to_string(), "b".to_string(), "c".to_string()],
555555
file_type,
556-
options: Default::default(),
557-
});
556+
Default::default(),
557+
));
558558

559559
let codec = CsvLogicalExtensionCodec {};
560560
let bytes = logical_plan_to_bytes_with_extension_codec(&plan, &codec)?;
@@ -614,13 +614,13 @@ async fn roundtrip_logical_plan_copy_to_json() -> Result<()> {
614614
json_format.clone(),
615615
)));
616616

617-
let plan = LogicalPlan::Copy(CopyTo {
618-
input: Arc::new(input),
619-
output_url: "test.json".to_string(),
620-
partition_by: vec!["a".to_string(), "b".to_string(), "c".to_string()],
617+
let plan = LogicalPlan::Copy(CopyTo::new(
618+
Arc::new(input),
619+
"test.json".to_string(),
620+
vec!["a".to_string(), "b".to_string(), "c".to_string()],
621621
file_type,
622-
options: Default::default(),
623-
});
622+
Default::default(),
623+
));
624624

625625
// Assume JsonLogicalExtensionCodec is implemented similarly to CsvLogicalExtensionCodec
626626
let codec = JsonLogicalExtensionCodec {};
@@ -686,13 +686,13 @@ async fn roundtrip_logical_plan_copy_to_parquet() -> Result<()> {
686686
ParquetFormatFactory::new_with_options(parquet_format.clone()),
687687
));
688688

689-
let plan = LogicalPlan::Copy(CopyTo {
690-
input: Arc::new(input),
691-
output_url: "test.parquet".to_string(),
692-
partition_by: vec!["a".to_string(), "b".to_string(), "c".to_string()],
689+
let plan = LogicalPlan::Copy(CopyTo::new(
690+
Arc::new(input),
691+
"test.parquet".to_string(),
692+
vec!["a".to_string(), "b".to_string(), "c".to_string()],
693693
file_type,
694-
options: Default::default(),
695-
});
694+
Default::default(),
695+
));
696696

697697
// Assume ParquetLogicalExtensionCodec is implemented similarly to JsonLogicalExtensionCodec
698698
let codec = ParquetLogicalExtensionCodec {};

datafusion/sql/src/statement.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1388,13 +1388,13 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
13881388
.map(|f| f.name().to_owned())
13891389
.collect();
13901390

1391-
Ok(LogicalPlan::Copy(CopyTo {
1392-
input: Arc::new(input),
1393-
output_url: statement.target,
1394-
file_type,
1391+
Ok(LogicalPlan::Copy(CopyTo::new(
1392+
Arc::new(input),
1393+
statement.target,
13951394
partition_by,
1396-
options: options_map,
1397-
}))
1395+
file_type,
1396+
options_map,
1397+
)))
13981398
}
13991399

14001400
fn build_order_by(

0 commit comments

Comments
 (0)