Skip to content

type_2_scd_generic_upsert does not handle NULL values properly #121

Open
@dgcaron

Description

The current way the type_2_scd_generic_upsert function checks for changes in a row involves evaluating each column and this does not yield the expected result. An possible perfomance improvement on this matter and cater for NULL values is to add a hash column that is calculated based on the contents of the columns in the table (except the scd2 system columns).

some background: https://datacadamia.com/dit/owb/scd2_hash

besides some code around it the most interesting changes would be
an udf to calculate the hash

differential_hash_udf = udf(
        lambda row: calculate_differential_hash(row, attr_col_names), StringType()
    )

addition of the hash column to the update set

updates_df = updates_df.withColumn(
        differential_hash_col_name, differential_hash_udf(struct(*attr_col_names))
    )

stage changes based on the hash of the columns instead of a column by column comparison

 staged_part_1 = (
        updates_df.alias("updates")
        .join(delta_table.toDF().alias("base"), primary_key)
        .where(
            f"base.{is_current_col_name} = true AND base.{differential_hash_col_name} <> updates.{differential_hash_col_name}"
        )
        .selectExpr("NULL as mergeKey", "updates.*")
    )

merge using the hash column

res = (
        delta_table.alias("base")
        .merge(
            source=staged_updates.alias("staged_updates"),
            condition=pyspark.sql.functions.expr(f"base.{primary_key} = mergeKey"),
        )
        .whenMatchedUpdate(
            condition=f"base.{is_current_col_name} = true AND base.{differential_hash_col_name} <> staged_updates.{differential_hash_col_name}",
            set={
                is_current_col_name: "false",
                end_time_col_name: f"staged_updates.{effective_time_col_name}",
            },
        )
        .whenNotMatchedInsert(values=res_thing)
        .execute()
    )

this is a breaking (not backwards compatible) change to what is now in the function so you could consider making this a different function?

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions