Skip to content

Commit 2093551

Browse files
bert-beyondloopsBert Vermeiren
andauthored
Fix: CopyTo logical plan outputs 1 column (apache#16705)
Co-authored-by: Bert Vermeiren <bert.vermeiren@datadobi.com>
1 parent 2afb681 commit 2093551

File tree

10 files changed

+113
-59
lines changed

10 files changed

+113
-59
lines changed

datafusion/core/src/physical_planner.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -505,6 +505,7 @@ impl DefaultPhysicalPlanner {
505505
file_type,
506506
partition_by,
507507
options: source_option_tuples,
508+
output_schema: _,
508509
}) => {
509510
let original_url = output_url.clone();
510511
let input_exec = children.one()?;

datafusion/core/tests/dataframe/mod.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6137,3 +6137,29 @@ async fn test_dataframe_macro() -> Result<()> {
61376137

61386138
Ok(())
61396139
}
6140+
6141+
#[tokio::test]
6142+
async fn test_copy_schema() -> Result<()> {
6143+
let tmp_dir = TempDir::new()?;
6144+
6145+
let session_state = SessionStateBuilder::new_with_default_features().build();
6146+
6147+
let session_ctx = SessionContext::new_with_state(session_state);
6148+
6149+
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, true)]));
6150+
6151+
// Create and register the source table with the provided schema and data
6152+
let source_table = Arc::new(MemTable::try_new(schema.clone(), vec![vec![]])?);
6153+
session_ctx.register_table("source_table", source_table.clone())?;
6154+
6155+
let target_path = tmp_dir.path().join("target.csv");
6156+
6157+
let query = format!(
6158+
"COPY source_table TO '{:?}' STORED AS csv",
6159+
target_path.to_str().unwrap()
6160+
);
6161+
6162+
let result = session_ctx.sql(&query).await?;
6163+
assert_logical_expr_schema_eq_physical_expr_schema(result).await?;
6164+
Ok(())
6165+
}

datafusion/expr/src/logical_plan/builder.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -407,13 +407,13 @@ impl LogicalPlanBuilder {
407407
options: HashMap<String, String>,
408408
partition_by: Vec<String>,
409409
) -> Result<Self> {
410-
Ok(Self::new(LogicalPlan::Copy(CopyTo {
411-
input: Arc::new(input),
410+
Ok(Self::new(LogicalPlan::Copy(CopyTo::new(
411+
Arc::new(input),
412412
output_url,
413413
partition_by,
414414
file_type,
415415
options,
416-
})))
416+
))))
417417
}
418418

419419
/// Create a [`DmlStatement`] for inserting the contents of this builder into the named table.

datafusion/expr/src/logical_plan/display.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -426,6 +426,7 @@ impl<'a, 'b> PgJsonVisitor<'a, 'b> {
426426
file_type,
427427
partition_by: _,
428428
options,
429+
output_schema: _,
429430
}) => {
430431
let op_str = options
431432
.iter()

datafusion/expr/src/logical_plan/dml.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ pub struct CopyTo {
4040
pub file_type: Arc<dyn FileType>,
4141
/// SQL Options that can affect the formats
4242
pub options: HashMap<String, String>,
43+
/// The schema of the output (a single column "count")
44+
pub output_schema: DFSchemaRef,
4345
}
4446

4547
impl Debug for CopyTo {
@@ -50,6 +52,7 @@ impl Debug for CopyTo {
5052
.field("partition_by", &self.partition_by)
5153
.field("file_type", &"...")
5254
.field("options", &self.options)
55+
.field("output_schema", &self.output_schema)
5356
.finish_non_exhaustive()
5457
}
5558
}
@@ -89,6 +92,26 @@ impl Hash for CopyTo {
8992
}
9093
}
9194

95+
impl CopyTo {
96+
pub fn new(
97+
input: Arc<LogicalPlan>,
98+
output_url: String,
99+
partition_by: Vec<String>,
100+
file_type: Arc<dyn FileType>,
101+
options: HashMap<String, String>,
102+
) -> Self {
103+
Self {
104+
input,
105+
output_url,
106+
partition_by,
107+
file_type,
108+
options,
109+
// The output schema is always a single column "count" with the number of rows copied
110+
output_schema: make_count_schema(),
111+
}
112+
}
113+
}
114+
92115
/// Modifies the content of a database
93116
///
94117
/// This operator is used to perform DML operations such as INSERT, DELETE,

datafusion/expr/src/logical_plan/plan.rs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -345,7 +345,7 @@ impl LogicalPlan {
345345
output_schema
346346
}
347347
LogicalPlan::Dml(DmlStatement { output_schema, .. }) => output_schema,
348-
LogicalPlan::Copy(CopyTo { input, .. }) => input.schema(),
348+
LogicalPlan::Copy(CopyTo { output_schema, .. }) => output_schema,
349349
LogicalPlan::Ddl(ddl) => ddl.schema(),
350350
LogicalPlan::Unnest(Unnest { schema, .. }) => schema,
351351
LogicalPlan::RecursiveQuery(RecursiveQuery { static_term, .. }) => {
@@ -810,16 +810,17 @@ impl LogicalPlan {
810810
file_type,
811811
options,
812812
partition_by,
813+
output_schema: _,
813814
}) => {
814815
self.assert_no_expressions(expr)?;
815816
let input = self.only_input(inputs)?;
816-
Ok(LogicalPlan::Copy(CopyTo {
817-
input: Arc::new(input),
818-
output_url: output_url.clone(),
819-
file_type: Arc::clone(file_type),
820-
options: options.clone(),
821-
partition_by: partition_by.clone(),
822-
}))
817+
Ok(LogicalPlan::Copy(CopyTo::new(
818+
Arc::new(input),
819+
output_url.clone(),
820+
partition_by.clone(),
821+
Arc::clone(file_type),
822+
options.clone(),
823+
)))
823824
}
824825
LogicalPlan::Values(Values { schema, .. }) => {
825826
self.assert_no_inputs(inputs)?;

datafusion/expr/src/logical_plan/tree_node.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,13 +243,15 @@ impl TreeNode for LogicalPlan {
243243
partition_by,
244244
file_type,
245245
options,
246+
output_schema,
246247
}) => input.map_elements(f)?.update_data(|input| {
247248
LogicalPlan::Copy(CopyTo {
248249
input,
249250
output_url,
250251
partition_by,
251252
file_type,
252253
options,
254+
output_schema,
253255
})
254256
}),
255257
LogicalPlan::Ddl(ddl) => {

datafusion/proto/src/logical_plan/mod.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -905,13 +905,13 @@ impl AsLogicalPlan for LogicalPlanNode {
905905
extension_codec.try_decode_file_format(&copy.file_type, ctx)?,
906906
);
907907

908-
Ok(LogicalPlan::Copy(dml::CopyTo {
909-
input: Arc::new(input),
910-
output_url: copy.output_url.clone(),
911-
partition_by: copy.partition_by.clone(),
908+
Ok(LogicalPlan::Copy(dml::CopyTo::new(
909+
Arc::new(input),
910+
copy.output_url.clone(),
911+
copy.partition_by.clone(),
912912
file_type,
913-
options: Default::default(),
914-
}))
913+
Default::default(),
914+
)))
915915
}
916916
LogicalPlanType::Unnest(unnest) => {
917917
let input: LogicalPlan =

datafusion/proto/tests/cases/roundtrip_logical_plan.rs

Lines changed: 36 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -429,13 +429,13 @@ async fn roundtrip_logical_plan_copy_to_sql_options() -> Result<()> {
429429
let input = create_csv_scan(&ctx).await?;
430430
let file_type = format_as_file_type(Arc::new(CsvFormatFactory::new()));
431431

432-
let plan = LogicalPlan::Copy(CopyTo {
433-
input: Arc::new(input),
434-
output_url: "test.csv".to_string(),
435-
partition_by: vec!["a".to_string(), "b".to_string(), "c".to_string()],
432+
let plan = LogicalPlan::Copy(CopyTo::new(
433+
Arc::new(input),
434+
"test.csv".to_string(),
435+
vec!["a".to_string(), "b".to_string(), "c".to_string()],
436436
file_type,
437-
options: Default::default(),
438-
});
437+
Default::default(),
438+
));
439439

440440
let codec = CsvLogicalExtensionCodec {};
441441
let bytes = logical_plan_to_bytes_with_extension_codec(&plan, &codec)?;
@@ -469,13 +469,13 @@ async fn roundtrip_logical_plan_copy_to_writer_options() -> Result<()> {
469469
ParquetFormatFactory::new_with_options(parquet_format),
470470
));
471471

472-
let plan = LogicalPlan::Copy(CopyTo {
473-
input: Arc::new(input),
474-
output_url: "test.parquet".to_string(),
472+
let plan = LogicalPlan::Copy(CopyTo::new(
473+
Arc::new(input),
474+
"test.parquet".to_string(),
475+
vec!["a".to_string(), "b".to_string(), "c".to_string()],
475476
file_type,
476-
partition_by: vec!["a".to_string(), "b".to_string(), "c".to_string()],
477-
options: Default::default(),
478-
});
477+
Default::default(),
478+
));
479479

480480
let codec = ParquetLogicalExtensionCodec {};
481481
let bytes = logical_plan_to_bytes_with_extension_codec(&plan, &codec)?;
@@ -501,13 +501,13 @@ async fn roundtrip_logical_plan_copy_to_arrow() -> Result<()> {
501501

502502
let file_type = format_as_file_type(Arc::new(ArrowFormatFactory::new()));
503503

504-
let plan = LogicalPlan::Copy(CopyTo {
505-
input: Arc::new(input),
506-
output_url: "test.arrow".to_string(),
507-
partition_by: vec!["a".to_string(), "b".to_string(), "c".to_string()],
504+
let plan = LogicalPlan::Copy(CopyTo::new(
505+
Arc::new(input),
506+
"test.arrow".to_string(),
507+
vec!["a".to_string(), "b".to_string(), "c".to_string()],
508508
file_type,
509-
options: Default::default(),
510-
});
509+
Default::default(),
510+
));
511511

512512
let codec = ArrowLogicalExtensionCodec {};
513513
let bytes = logical_plan_to_bytes_with_extension_codec(&plan, &codec)?;
@@ -548,13 +548,13 @@ async fn roundtrip_logical_plan_copy_to_csv() -> Result<()> {
548548
csv_format.clone(),
549549
)));
550550

551-
let plan = LogicalPlan::Copy(CopyTo {
552-
input: Arc::new(input),
553-
output_url: "test.csv".to_string(),
554-
partition_by: vec!["a".to_string(), "b".to_string(), "c".to_string()],
551+
let plan = LogicalPlan::Copy(CopyTo::new(
552+
Arc::new(input),
553+
"test.csv".to_string(),
554+
vec!["a".to_string(), "b".to_string(), "c".to_string()],
555555
file_type,
556-
options: Default::default(),
557-
});
556+
Default::default(),
557+
));
558558

559559
let codec = CsvLogicalExtensionCodec {};
560560
let bytes = logical_plan_to_bytes_with_extension_codec(&plan, &codec)?;
@@ -614,13 +614,13 @@ async fn roundtrip_logical_plan_copy_to_json() -> Result<()> {
614614
json_format.clone(),
615615
)));
616616

617-
let plan = LogicalPlan::Copy(CopyTo {
618-
input: Arc::new(input),
619-
output_url: "test.json".to_string(),
620-
partition_by: vec!["a".to_string(), "b".to_string(), "c".to_string()],
617+
let plan = LogicalPlan::Copy(CopyTo::new(
618+
Arc::new(input),
619+
"test.json".to_string(),
620+
vec!["a".to_string(), "b".to_string(), "c".to_string()],
621621
file_type,
622-
options: Default::default(),
623-
});
622+
Default::default(),
623+
));
624624

625625
// Assume JsonLogicalExtensionCodec is implemented similarly to CsvLogicalExtensionCodec
626626
let codec = JsonLogicalExtensionCodec {};
@@ -686,13 +686,13 @@ async fn roundtrip_logical_plan_copy_to_parquet() -> Result<()> {
686686
ParquetFormatFactory::new_with_options(parquet_format.clone()),
687687
));
688688

689-
let plan = LogicalPlan::Copy(CopyTo {
690-
input: Arc::new(input),
691-
output_url: "test.parquet".to_string(),
692-
partition_by: vec!["a".to_string(), "b".to_string(), "c".to_string()],
689+
let plan = LogicalPlan::Copy(CopyTo::new(
690+
Arc::new(input),
691+
"test.parquet".to_string(),
692+
vec!["a".to_string(), "b".to_string(), "c".to_string()],
693693
file_type,
694-
options: Default::default(),
695-
});
694+
Default::default(),
695+
));
696696

697697
// Assume ParquetLogicalExtensionCodec is implemented similarly to JsonLogicalExtensionCodec
698698
let codec = ParquetLogicalExtensionCodec {};

datafusion/sql/src/statement.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1388,13 +1388,13 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
13881388
.map(|f| f.name().to_owned())
13891389
.collect();
13901390

1391-
Ok(LogicalPlan::Copy(CopyTo {
1392-
input: Arc::new(input),
1393-
output_url: statement.target,
1394-
file_type,
1391+
Ok(LogicalPlan::Copy(CopyTo::new(
1392+
Arc::new(input),
1393+
statement.target,
13951394
partition_by,
1396-
options: options_map,
1397-
}))
1395+
file_type,
1396+
options_map,
1397+
)))
13981398
}
13991399

14001400
fn build_order_by(

0 commit comments

Comments
 (0)