Description
The current way the type_2_scd_generic_upsert function checks for changes in a row involves evaluating each column and this does not yield the expected result. An possible perfomance improvement on this matter and cater for NULL values is to add a hash column that is calculated based on the contents of the columns in the table (except the scd2 system columns).
some background: https://datacadamia.com/dit/owb/scd2_hash
besides some code around it the most interesting changes would be
an udf to calculate the hash
differential_hash_udf = udf(
lambda row: calculate_differential_hash(row, attr_col_names), StringType()
)
addition of the hash column to the update set
updates_df = updates_df.withColumn(
differential_hash_col_name, differential_hash_udf(struct(*attr_col_names))
)
stage changes based on the hash of the columns instead of a column by column comparison
staged_part_1 = (
updates_df.alias("updates")
.join(delta_table.toDF().alias("base"), primary_key)
.where(
f"base.{is_current_col_name} = true AND base.{differential_hash_col_name} <> updates.{differential_hash_col_name}"
)
.selectExpr("NULL as mergeKey", "updates.*")
)
merge using the hash column
res = (
delta_table.alias("base")
.merge(
source=staged_updates.alias("staged_updates"),
condition=pyspark.sql.functions.expr(f"base.{primary_key} = mergeKey"),
)
.whenMatchedUpdate(
condition=f"base.{is_current_col_name} = true AND base.{differential_hash_col_name} <> staged_updates.{differential_hash_col_name}",
set={
is_current_col_name: "false",
end_time_col_name: f"staged_updates.{effective_time_col_name}",
},
)
.whenNotMatchedInsert(values=res_thing)
.execute()
)
this is a breaking (not backwards compatible) change to what is now in the function so you could consider making this a different function?