Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,7 @@ impl DefaultPhysicalPlanner {
file_type,
partition_by,
options: source_option_tuples,
output_schema: _,
}) => {
let original_url = output_url.clone();
let input_exec = children.one()?;
Expand Down
26 changes: 26 additions & 0 deletions datafusion/core/tests/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6137,3 +6137,29 @@ async fn test_dataframe_macro() -> Result<()> {

Ok(())
}

#[tokio::test]
async fn test_copy_schema() -> Result<()> {
let tmp_dir = TempDir::new()?;

let session_state = SessionStateBuilder::new_with_default_features().build();

let session_ctx = SessionContext::new_with_state(session_state);

let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, true)]));

// Create and register the source table with the provided schema and data
let source_table = Arc::new(MemTable::try_new(schema.clone(), vec![vec![]])?);
session_ctx.register_table("source_table", source_table.clone())?;

let target_path = tmp_dir.path().join("target.csv");

let query = format!(
"COPY source_table TO '{:?}' STORED AS csv",
target_path.to_str().unwrap()
);

let result = session_ctx.sql(&query).await?;
assert_logical_expr_schema_eq_physical_expr_schema(result).await?;
Ok(())
}
6 changes: 3 additions & 3 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -407,13 +407,13 @@ impl LogicalPlanBuilder {
options: HashMap<String, String>,
partition_by: Vec<String>,
) -> Result<Self> {
Ok(Self::new(LogicalPlan::Copy(CopyTo {
input: Arc::new(input),
Ok(Self::new(LogicalPlan::Copy(CopyTo::new(
Arc::new(input),
output_url,
partition_by,
file_type,
options,
})))
))))
}

/// Create a [`DmlStatement`] for inserting the contents of this builder into the named table.
Expand Down
1 change: 1 addition & 0 deletions datafusion/expr/src/logical_plan/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,7 @@ impl<'a, 'b> PgJsonVisitor<'a, 'b> {
file_type,
partition_by: _,
options,
output_schema: _,
}) => {
let op_str = options
.iter()
Expand Down
23 changes: 23 additions & 0 deletions datafusion/expr/src/logical_plan/dml.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ pub struct CopyTo {
pub file_type: Arc<dyn FileType>,
/// SQL Options that can affect the formats
pub options: HashMap<String, String>,
/// The schema of the output (a single column "count")
pub output_schema: DFSchemaRef,
}

impl Debug for CopyTo {
Expand All @@ -50,6 +52,7 @@ impl Debug for CopyTo {
.field("partition_by", &self.partition_by)
.field("file_type", &"...")
.field("options", &self.options)
.field("output_schema", &self.output_schema)
.finish_non_exhaustive()
}
}
Expand Down Expand Up @@ -89,6 +92,26 @@ impl Hash for CopyTo {
}
}

impl CopyTo {
pub fn new(
input: Arc<LogicalPlan>,
output_url: String,
partition_by: Vec<String>,
file_type: Arc<dyn FileType>,
options: HashMap<String, String>,
) -> Self {
Self {
input,
output_url,
partition_by,
file_type,
options,
// The output schema is always a single column "count" with the number of rows copied
output_schema: make_count_schema(),
}
}
}

/// Modifies the content of a database
///
/// This operator is used to perform DML operations such as INSERT, DELETE,
Expand Down
17 changes: 9 additions & 8 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ impl LogicalPlan {
output_schema
}
LogicalPlan::Dml(DmlStatement { output_schema, .. }) => output_schema,
LogicalPlan::Copy(CopyTo { input, .. }) => input.schema(),
LogicalPlan::Copy(CopyTo { output_schema, .. }) => output_schema,
LogicalPlan::Ddl(ddl) => ddl.schema(),
LogicalPlan::Unnest(Unnest { schema, .. }) => schema,
LogicalPlan::RecursiveQuery(RecursiveQuery { static_term, .. }) => {
Expand Down Expand Up @@ -809,16 +809,17 @@ impl LogicalPlan {
file_type,
options,
partition_by,
output_schema: _,
}) => {
self.assert_no_expressions(expr)?;
let input = self.only_input(inputs)?;
Ok(LogicalPlan::Copy(CopyTo {
input: Arc::new(input),
output_url: output_url.clone(),
file_type: Arc::clone(file_type),
options: options.clone(),
partition_by: partition_by.clone(),
}))
Ok(LogicalPlan::Copy(CopyTo::new(
Arc::new(input),
output_url.clone(),
partition_by.clone(),
Arc::clone(file_type),
options.clone(),
)))
}
LogicalPlan::Values(Values { schema, .. }) => {
self.assert_no_inputs(inputs)?;
Expand Down
2 changes: 2 additions & 0 deletions datafusion/expr/src/logical_plan/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,13 +243,15 @@ impl TreeNode for LogicalPlan {
partition_by,
file_type,
options,
output_schema,
}) => input.map_elements(f)?.update_data(|input| {
LogicalPlan::Copy(CopyTo {
input,
output_url,
partition_by,
file_type,
options,
output_schema,
})
}),
LogicalPlan::Ddl(ddl) => {
Expand Down
12 changes: 6 additions & 6 deletions datafusion/proto/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -905,13 +905,13 @@ impl AsLogicalPlan for LogicalPlanNode {
extension_codec.try_decode_file_format(&copy.file_type, ctx)?,
);

Ok(LogicalPlan::Copy(dml::CopyTo {
input: Arc::new(input),
output_url: copy.output_url.clone(),
partition_by: copy.partition_by.clone(),
Ok(LogicalPlan::Copy(dml::CopyTo::new(
Arc::new(input),
copy.output_url.clone(),
copy.partition_by.clone(),
file_type,
options: Default::default(),
}))
Default::default(),
)))
}
LogicalPlanType::Unnest(unnest) => {
let input: LogicalPlan =
Expand Down
72 changes: 36 additions & 36 deletions datafusion/proto/tests/cases/roundtrip_logical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -429,13 +429,13 @@ async fn roundtrip_logical_plan_copy_to_sql_options() -> Result<()> {
let input = create_csv_scan(&ctx).await?;
let file_type = format_as_file_type(Arc::new(CsvFormatFactory::new()));

let plan = LogicalPlan::Copy(CopyTo {
input: Arc::new(input),
output_url: "test.csv".to_string(),
partition_by: vec!["a".to_string(), "b".to_string(), "c".to_string()],
let plan = LogicalPlan::Copy(CopyTo::new(
Arc::new(input),
"test.csv".to_string(),
vec!["a".to_string(), "b".to_string(), "c".to_string()],
file_type,
options: Default::default(),
});
Default::default(),
));

let codec = CsvLogicalExtensionCodec {};
let bytes = logical_plan_to_bytes_with_extension_codec(&plan, &codec)?;
Expand Down Expand Up @@ -469,13 +469,13 @@ async fn roundtrip_logical_plan_copy_to_writer_options() -> Result<()> {
ParquetFormatFactory::new_with_options(parquet_format),
));

let plan = LogicalPlan::Copy(CopyTo {
input: Arc::new(input),
output_url: "test.parquet".to_string(),
let plan = LogicalPlan::Copy(CopyTo::new(
Arc::new(input),
"test.parquet".to_string(),
vec!["a".to_string(), "b".to_string(), "c".to_string()],
file_type,
partition_by: vec!["a".to_string(), "b".to_string(), "c".to_string()],
options: Default::default(),
});
Default::default(),
));

let codec = ParquetLogicalExtensionCodec {};
let bytes = logical_plan_to_bytes_with_extension_codec(&plan, &codec)?;
Expand All @@ -501,13 +501,13 @@ async fn roundtrip_logical_plan_copy_to_arrow() -> Result<()> {

let file_type = format_as_file_type(Arc::new(ArrowFormatFactory::new()));

let plan = LogicalPlan::Copy(CopyTo {
input: Arc::new(input),
output_url: "test.arrow".to_string(),
partition_by: vec!["a".to_string(), "b".to_string(), "c".to_string()],
let plan = LogicalPlan::Copy(CopyTo::new(
Arc::new(input),
"test.arrow".to_string(),
vec!["a".to_string(), "b".to_string(), "c".to_string()],
file_type,
options: Default::default(),
});
Default::default(),
));

let codec = ArrowLogicalExtensionCodec {};
let bytes = logical_plan_to_bytes_with_extension_codec(&plan, &codec)?;
Expand Down Expand Up @@ -548,13 +548,13 @@ async fn roundtrip_logical_plan_copy_to_csv() -> Result<()> {
csv_format.clone(),
)));

let plan = LogicalPlan::Copy(CopyTo {
input: Arc::new(input),
output_url: "test.csv".to_string(),
partition_by: vec!["a".to_string(), "b".to_string(), "c".to_string()],
let plan = LogicalPlan::Copy(CopyTo::new(
Arc::new(input),
"test.csv".to_string(),
vec!["a".to_string(), "b".to_string(), "c".to_string()],
file_type,
options: Default::default(),
});
Default::default(),
));

let codec = CsvLogicalExtensionCodec {};
let bytes = logical_plan_to_bytes_with_extension_codec(&plan, &codec)?;
Expand Down Expand Up @@ -614,13 +614,13 @@ async fn roundtrip_logical_plan_copy_to_json() -> Result<()> {
json_format.clone(),
)));

let plan = LogicalPlan::Copy(CopyTo {
input: Arc::new(input),
output_url: "test.json".to_string(),
partition_by: vec!["a".to_string(), "b".to_string(), "c".to_string()],
let plan = LogicalPlan::Copy(CopyTo::new(
Arc::new(input),
"test.json".to_string(),
vec!["a".to_string(), "b".to_string(), "c".to_string()],
file_type,
options: Default::default(),
});
Default::default(),
));

// Assume JsonLogicalExtensionCodec is implemented similarly to CsvLogicalExtensionCodec
let codec = JsonLogicalExtensionCodec {};
Expand Down Expand Up @@ -686,13 +686,13 @@ async fn roundtrip_logical_plan_copy_to_parquet() -> Result<()> {
ParquetFormatFactory::new_with_options(parquet_format.clone()),
));

let plan = LogicalPlan::Copy(CopyTo {
input: Arc::new(input),
output_url: "test.parquet".to_string(),
partition_by: vec!["a".to_string(), "b".to_string(), "c".to_string()],
let plan = LogicalPlan::Copy(CopyTo::new(
Arc::new(input),
"test.parquet".to_string(),
vec!["a".to_string(), "b".to_string(), "c".to_string()],
file_type,
options: Default::default(),
});
Default::default(),
));

// Assume ParquetLogicalExtensionCodec is implemented similarly to JsonLogicalExtensionCodec
let codec = ParquetLogicalExtensionCodec {};
Expand Down
12 changes: 6 additions & 6 deletions datafusion/sql/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1388,13 +1388,13 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
.map(|f| f.name().to_owned())
.collect();

Ok(LogicalPlan::Copy(CopyTo {
input: Arc::new(input),
output_url: statement.target,
file_type,
Ok(LogicalPlan::Copy(CopyTo::new(
Arc::new(input),
statement.target,
partition_by,
options: options_map,
}))
file_type,
options_map,
)))
}

fn build_order_by(
Expand Down