Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 82 additions & 1 deletion crates/sail-common-datafusion/src/datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use datafusion::physical_expr::{
create_physical_sort_exprs, LexOrdering, LexRequirement, PhysicalExpr, PhysicalSortRequirement,
};
use datafusion::physical_plan::ExecutionPlan;
use datafusion_common::{not_impl_err, plan_err, Constraints, DFSchema, Result};
use datafusion_common::{not_impl_err, plan_err, Constraints, DFSchema, DFSchemaRef, Result};
use datafusion_expr::expr::Sort;
use datafusion_expr::Expr;

Expand Down Expand Up @@ -80,6 +80,74 @@ pub struct DeleteInfo {
pub options: Vec<HashMap<String, String>>,
}

#[derive(Debug, Clone)]
pub struct MergeTargetInfo {
pub table_name: Vec<String>,
pub path: String,
pub partition_by: Vec<String>,
pub options: Vec<HashMap<String, String>>,
}

#[derive(Debug, Clone)]
pub struct MergeAssignmentInfo {
pub column: String,
pub value: Arc<dyn PhysicalExpr>,
}

#[derive(Debug, Clone)]
pub enum MergeMatchedActionInfo {
Delete,
UpdateAll,
UpdateSet(Vec<MergeAssignmentInfo>),
}

#[derive(Debug, Clone)]
pub struct MergeMatchedClauseInfo {
pub condition: Option<Arc<dyn PhysicalExpr>>,
pub action: MergeMatchedActionInfo,
}

#[derive(Debug, Clone)]
pub enum MergeNotMatchedBySourceActionInfo {
Delete,
UpdateSet(Vec<MergeAssignmentInfo>),
}

#[derive(Debug, Clone)]
pub struct MergeNotMatchedBySourceClauseInfo {
pub condition: Option<Arc<dyn PhysicalExpr>>,
pub action: MergeNotMatchedBySourceActionInfo,
}

#[derive(Debug, Clone)]
pub enum MergeNotMatchedByTargetActionInfo {
InsertAll,
InsertColumns {
columns: Vec<String>,
values: Vec<Arc<dyn PhysicalExpr>>,
},
}

#[derive(Debug, Clone)]
pub struct MergeNotMatchedByTargetClauseInfo {
pub condition: Option<Arc<dyn PhysicalExpr>>,
pub action: MergeNotMatchedByTargetActionInfo,
}

#[derive(Debug, Clone)]
pub struct MergeInfo {
pub target: MergeTargetInfo,
pub target_input: Arc<dyn ExecutionPlan>,
pub source: Arc<dyn ExecutionPlan>,
pub target_schema: DFSchemaRef,
pub source_schema: DFSchemaRef,
pub on_condition: Arc<dyn PhysicalExpr>,
pub matched_clauses: Vec<MergeMatchedClauseInfo>,
pub not_matched_by_source_clauses: Vec<MergeNotMatchedBySourceClauseInfo>,
pub not_matched_by_target_clauses: Vec<MergeNotMatchedByTargetClauseInfo>,
pub with_schema_evolution: bool,
}

/// A trait for preparing physical execution for a specific format.
#[async_trait]
pub trait TableFormat: Send + Sync {
Expand Down Expand Up @@ -112,6 +180,19 @@ pub trait TableFormat: Send + Sync {
self.name()
)
}

/// Creates an `ExecutionPlan` for MERGE.
async fn create_merger(
&self,
ctx: &dyn Session,
info: MergeInfo,
) -> Result<Arc<dyn ExecutionPlan>> {
let _ = (ctx, info);
not_impl_err!(
"MERGE operation is not yet implemented for {} format",
self.name()
)
}
}

/// Thread-safe registry of available `TableFormat` implementations.
Expand Down
84 changes: 79 additions & 5 deletions crates/sail-common/src/spec/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -430,11 +430,7 @@ pub enum CommandNode {
row_format: Option<TableRowFormat>,
options: Vec<(String, String)>,
},
MergeInto {
target: ObjectName,
with_schema_evolution: bool,
// TODO: add other fields
},
MergeInto(MergeInto),
SetVariable {
variable: String,
value: String,
Expand Down Expand Up @@ -516,6 +512,84 @@ pub enum CommandNode {
},
}

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct MergeInto {
pub target: ObjectName,
pub target_alias: Option<Identifier>,
pub source: MergeSource,
pub on_condition: Expr,
pub clauses: Vec<MergeClause>,
pub with_schema_evolution: bool,
}

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum MergeSource {
Table {
name: ObjectName,
alias: Option<Identifier>,
},
Query {
input: Box<QueryPlan>,
alias: Option<Identifier>,
},
}

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum MergeClause {
Matched(MergeMatchedClause),
NotMatchedBySource(MergeNotMatchedBySourceClause),
NotMatchedByTarget(MergeNotMatchedByTargetClause),
}

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct MergeMatchedClause {
pub condition: Option<Expr>,
pub action: MergeMatchedAction,
}

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum MergeMatchedAction {
Delete,
UpdateAll,
UpdateSet(Vec<(ObjectName, Expr)>),
}

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct MergeNotMatchedBySourceClause {
pub condition: Option<Expr>,
pub action: MergeNotMatchedBySourceAction,
}

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum MergeNotMatchedBySourceAction {
Delete,
UpdateSet(Vec<(ObjectName, Expr)>),
}

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct MergeNotMatchedByTargetClause {
pub condition: Option<Expr>,
pub action: MergeNotMatchedByTargetAction,
}

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum MergeNotMatchedByTargetAction {
InsertAll,
InsertColumns {
columns: Vec<ObjectName>,
values: Vec<Expr>,
},
}

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase", rename_all_fields = "camelCase")]
#[allow(clippy::large_enum_variant)]
Expand Down
2 changes: 1 addition & 1 deletion crates/sail-delta-lake/src/datasource/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use url::Url;
use crate::kernel::snapshot::LogDataHandler;
use crate::kernel::{DeltaResult, DeltaTableError};
use crate::table::DeltaTableState;
pub(crate) const PATH_COLUMN: &str = "__delta_rs_path";
pub(crate) const PATH_COLUMN: &str = "__sail_file_path";

pub mod actions;
pub mod expressions;
Expand Down
2 changes: 1 addition & 1 deletion crates/sail-delta-lake/src/datasource/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ pub fn build_file_scan_config(
table_partition_cols_schema.push(Field::new(
file_column_name.clone(),
field_name_datatype,
false,
true,
));
}

Expand Down
6 changes: 4 additions & 2 deletions crates/sail-delta-lake/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ use crate::kernel::DeltaOperation;
mod commit_exec;
mod expr_adapter;
pub mod find_files_exec;
pub mod plan_builder;
mod remove_actions_exec;
mod scan_by_adds_exec;
mod utils;
Expand All @@ -42,7 +41,10 @@ mod writer_exec;
pub use commit_exec::DeltaCommitExec;
pub use expr_adapter::DeltaPhysicalExprAdapterFactory;
pub use find_files_exec::DeltaFindFilesExec;
pub use plan_builder::{DeltaDeletePlanBuilder, DeltaPlanBuilder};
pub mod planner;
pub use planner::{
plan_delete, plan_merge, plan_update, DeltaPhysicalPlanner, DeltaTableConfig, PlannerContext,
};
pub use remove_actions_exec::DeltaRemoveActionsExec;
pub use scan_by_adds_exec::DeltaScanByAddsExec;
pub(crate) use utils::join_batches_with_add_actions;
Expand Down
Loading