Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor!: update operations to use delta scan #1639

Merged
merged 29 commits into from
Oct 22, 2023
Merged
Changes from 1 commit
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
e713ab3
:WIP: Delta scan changes
Blajda Aug 6, 2023
db66761
reintroduce projection for find files
Blajda Aug 6, 2023
2c384d9
Change path column to dict encoding
Blajda Aug 9, 2023
df1d2ee
impl serde for scan
Blajda Aug 12, 2023
40403bd
merge with main
Blajda Sep 12, 2023
0c39276
refactor merge to use delta scan
Blajda Sep 12, 2023
27178ab
Merge remote-tracking branch 'origin/main' into scan-refactor
Blajda Sep 17, 2023
fd41ac8
change schema to use
Blajda Oct 1, 2023
01e7ee2
resolve merge conflicts with main
Blajda Oct 1, 2023
254e560
resolve merge conflicts with main
Blajda Oct 1, 2023
5855bb0
fix integration tests
Blajda Oct 1, 2023
9ba6258
fix integration tests
Blajda Oct 1, 2023
6b5d040
fix merge operation usage with table alias
Blajda Oct 6, 2023
b2ae59a
clean up
Blajda Oct 7, 2023
e3bf7cd
Merge remote-tracking branch 'origin/main' into merge-predicate-parse…
Blajda Oct 7, 2023
9354c4c
update docs
Blajda Oct 7, 2023
c3eb0cb
allow using target alias for merge op target
Blajda Oct 7, 2023
b80cde0
Merge remote-tracking branch 'origin/main' into merge-predicate-parse…
Blajda Oct 7, 2023
49b7857
additional alias test
Blajda Oct 7, 2023
ce1d02c
merge with main
Blajda Oct 8, 2023
662af12
refactor config builder + new table provider
Blajda Oct 9, 2023
df2e431
Merge remote-tracking branch 'origin/main' into merge-predicate-parse…
Blajda Oct 9, 2023
4b87e93
Merge remote-tracking branch 'origin/main' into scan-refactor
Blajda Oct 9, 2023
345da95
merge with merge-expr branch
Blajda Oct 9, 2023
ac764d6
Merge remote-tracking branch 'origin/main' into scan-refactor
Blajda Oct 13, 2023
5df2f6f
Merge branch 'main' into scan-refactor
Blajda Oct 20, 2023
f6b3ab4
Update rust/src/delta_datafusion/mod.rs
Blajda Oct 21, 2023
21300e5
Update rust/src/delta_datafusion/mod.rs
Blajda Oct 21, 2023
e372316
Merge branch 'main' into scan-refactor
Blajda Oct 21, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
allow using target alias for merge op target
  • Loading branch information
Blajda committed Oct 7, 2023
commit c3eb0cba0304d11092f8ea145b20b2d940c31c2d
162 changes: 140 additions & 22 deletions rust/src/operations/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -461,10 +461,47 @@ impl MergeOperation {
config: MergeOperationConfig,
schema: &DFSchema,
state: &SessionState,
target_alias: &Option<String>,
) -> DeltaResult<MergeOperation> {
let mut ops = HashMap::with_capacity(config.operations.capacity());

for (column, expression) in config.operations.into_iter() {
// Normalize the column name to contain the target alias. If a table reference was provided ensure it's the target.
let column = match target_alias {
Some(alias) => {
let r = TableReference::bare(alias.to_owned());
match column {
Column {
relation: None,
name,
} => Column {
relation: Some(r),
name,
},
Column {
relation: Some(TableReference::Bare { table }),
name,
} => {
if table.eq(alias) {
Column {
relation: Some(r),
name,
}
} else {
return Err(DeltaTableError::Generic(
"Column must reference column in Delta table".into(),
));
}
}
_ => {
return Err(DeltaTableError::Generic(
"Column must reference column in Delta table".into(),
))
}
}
}
None => column,
};
ops.insert(column, into_expr(expression, schema, state)?);
}

Expand Down Expand Up @@ -635,17 +672,17 @@ async fn execute(

let match_operations: Vec<MergeOperation> = match_operations
.into_iter()
.map(|op| MergeOperation::try_from(op, &join_schema_df, &state))
.map(|op| MergeOperation::try_from(op, &join_schema_df, &state, &target_alias))
.collect::<Result<Vec<MergeOperation>, DeltaTableError>>()?;

let not_match_target_operations: Vec<MergeOperation> = not_match_target_operations
.into_iter()
.map(|op| MergeOperation::try_from(op, &join_schema_df, &state))
.map(|op| MergeOperation::try_from(op, &join_schema_df, &state, &target_alias))
.collect::<Result<Vec<MergeOperation>, DeltaTableError>>()?;

let not_match_source_operations: Vec<MergeOperation> = not_match_source_operations
.into_iter()
.map(|op| MergeOperation::try_from(op, &join_schema_df, &state))
.map(|op| MergeOperation::try_from(op, &join_schema_df, &state, &target_alias))
.collect::<Result<Vec<MergeOperation>, DeltaTableError>>()?;

let predicate_expr = create_physical_expr(
Expand Down Expand Up @@ -808,7 +845,7 @@ async fn execute(
let mut projection_map = HashMap::new();
let mut f = project_schema_df.fields().clone();

for field in snapshot.schema().unwrap().get_fields() {
for delta_field in snapshot.schema().unwrap().get_fields() {
let mut when_expr = Vec::with_capacity(operations_size);
let mut then_expr = Vec::with_capacity(operations_size);

Expand All @@ -818,13 +855,15 @@ async fn execute(
}),
None => TableReference::none(),
};
let column = project_schema_df.field_with_name(qualifier.as_ref(), field.get_name())?;
let name = delta_field.get_name();
let column = Column::new(qualifier.clone(), name);
let field = project_schema_df.field_with_name(qualifier.as_ref(), name)?;

for (idx, (operations, _)) in ops.iter().enumerate() {
let op = operations
.get(&field.get_name().to_owned().into())
.get(&column)
.map(|expr| expr.to_owned())
.unwrap_or_else(|| col(Column::new(qualifier.clone(), field.get_name())));
.unwrap_or_else(|| col(column.clone()));

when_expr.push(lit(idx as i32));
then_expr.push(op);
Expand All @@ -845,12 +884,12 @@ async fn execute(
state.execution_props(),
)?;

projection_map.insert(field.get_name(), expressions.len());
let name = "__delta_rs_c_".to_owned() + field.get_name();
projection_map.insert(delta_field.get_name(), expressions.len());
let name = "__delta_rs_c_".to_owned() + delta_field.get_name();

f.push(DFField::new_unqualified(
&name,
column.data_type().clone(),
field.data_type().clone(),
true,
));
expressions.push((case, name));
Expand Down Expand Up @@ -1218,13 +1257,13 @@ mod tests {
use arrow::datatypes::Schema as ArrowSchema;
use arrow::record_batch::RecordBatch;
use datafusion::assert_batches_sorted_eq;
use datafusion::prelude::DataFrame;
use datafusion::prelude::SessionContext;
use datafusion_expr::col;
use datafusion_expr::lit;
use serde_json::json;
use std::sync::Arc;

use super::MergeBuilder;
use super::MergeMetrics;

async fn setup_table(partitions: Option<Vec<&str>>) -> DeltaTable {
Expand Down Expand Up @@ -1263,7 +1302,7 @@ mod tests {
.unwrap()
}

async fn setup_op() -> MergeBuilder {
async fn setup() -> (DeltaTable, DataFrame) {
let schema = get_arrow_schema(&None);
let table = setup_table(None).await;

Expand All @@ -1286,10 +1325,7 @@ mod tests {
)
.unwrap();
let source = ctx.read_batch(batch).unwrap();
DeltaOps(table)
.merge(source, col("target.id").eq(col("source.id")))
.with_source_alias("source")
.with_target_alias("target")
(table, source)
}

async fn assert_merge(table: DeltaTable, metrics: MergeMetrics) {
Expand Down Expand Up @@ -1321,8 +1357,12 @@ mod tests {

#[tokio::test]
async fn test_merge() {
let (mut table, metrics) = setup_op()
.await
let (table, source) = setup().await;

let (mut table, metrics) = DeltaOps(table)
.merge(source, col("target.id").eq(col("source.id")))
.with_source_alias("source")
.with_target_alias("target")
.when_matched_update(|update| {
update
.update("value", col("source.value"))
Expand Down Expand Up @@ -1368,11 +1408,16 @@ mod tests {
#[tokio::test]
async fn test_merge_str() {
// Validate that users can use string predicates
let (mut table, metrics) = setup_op()
.await
// Also validates that update and set operations can contain the target alias
let (table, source) = setup().await;

let (mut table, metrics) = DeltaOps(table)
.merge(source, "target.id = source.id")
.with_source_alias("source")
.with_target_alias("target")
.when_matched_update(|update| {
update
.update("value", "source.value")
.update("target.value", "source.value")
.update("modified", "source.modified")
})
.unwrap()
Expand All @@ -1384,7 +1429,7 @@ mod tests {
.unwrap()
.when_not_matched_insert(|insert| {
insert
.set("id", "source.id")
.set("target.id", "source.id")
.set("value", "source.value")
.set("modified", "source.modified")
})
Expand Down Expand Up @@ -1414,6 +1459,79 @@ mod tests {
assert_merge(table, metrics).await;
}

#[tokio::test]
async fn test_merge_no_alias() {
// Validate merge can be used without specifying an alias
let (table, source) = setup().await;

let source = source
.with_column_renamed("id", "source_id")
.unwrap()
.with_column_renamed("value", "source_value")
.unwrap()
.with_column_renamed("modified", "source_modified")
.unwrap();

let (table, metrics) = DeltaOps(table)
.merge(source, "id = source_id")
.when_matched_update(|update| {
update
.update("value", "source_value")
.update("modified", "source_modified")
})
.unwrap()
.when_not_matched_by_source_update(|update| {
update
.predicate("value = arrow_cast(1, 'Int32')")
.update("value", "value + cast(1 as int)")
})
.unwrap()
.when_not_matched_insert(|insert| {
insert
.set("id", "source_id")
.set("value", "source_value")
.set("modified", "source_modified")
})
.unwrap()
.await
.unwrap();

assert_merge(table, metrics).await;
}

#[tokio::test]
async fn test_merge_failures() {
// Validate target columns cannot be from the source
let (table, source) = setup().await;
let res = DeltaOps(table)
.merge(source, col("target.id").eq(col("source.id")))
.with_source_alias("source")
.with_target_alias("target")
.when_matched_update(|update| {
update
.update("source.value", "source.value")
.update("modified", "source.modified")
})
.unwrap()
.await;
assert!(res.is_err());

// Validate failure when aliases are the same
let (table, source) = setup().await;
let res = DeltaOps(table)
.merge(source, col("target.id").eq(col("source.id")))
.with_source_alias("source")
.with_target_alias("source")
.when_matched_update(|update| {
update
.update("target.value", "source.value")
.update("modified", "source.modified")
})
.unwrap()
.await;
assert!(res.is_err())
}

#[tokio::test]
async fn test_merge_partitions() {
/* Validate the join predicate works with partition columns */
Expand Down