Skip to content

SCD Type 2 Snowpark upsert query not optimized #4689

@zunedz

Description

@zunedz

I have a SCD Type 2 model written in Snowpark as follows:

from datetime import datetime

from snowflake.snowpark import functions as F
from snowflake.snowpark.dataframe import DataFrame
from sqlmesh import ExecutionContext
from sqlmesh import model
from sqlmesh.core.model.kind import ModelKindName


@model(
    name="playground.test",
    kind={
        "name": ModelKindName.SCD_TYPE_2_BY_TIME,
        "unique_key": ["id"],
        "disable_restatement": False,
    },
    start="2023-01-01",
    columns={
        "id": "int",
        "value": "int",
        "updated_at": "timestamp",
        "valid_from": "timestamp",
        "valid_to": "timestamp",
    },
    interval_unit="day",
)
def execute(
    context: ExecutionContext,
    start: datetime,
    end: datetime,
    **kwargs,
) -> DataFrame:
    context.var
    # Upstream models (including external tables)
    test_source_table = context.resolve_table("playground.test_source")

    test_source = context.snowpark.table(test_source_table)

    # Filter to only include latest data by using updated_at column
    test = test_source.filter(F.col("updated_at").between(start, end))

    test = test.select(
        F.col("id"),
        F.col("value"),
        F.to_date(F.lit("2025-05-16")).as_("updated_at"),
    )

    return test

This model compiles to this SQL query when executed:

CREATE OR REPLACE TABLE "SOURCE_DB"."sqlmesh__PLAYGROUND"."PLAYGROUND__TEST__2041369271" AS
WITH "source" AS (
  SELECT
    "_exists",
    "ID",
    "VALUE",
    "UPDATED_AT"
  FROM (
    SELECT
      TRUE AS "_exists",
      "ID" AS "ID",
      "VALUE" AS "VALUE",
      CAST("UPDATED_AT" AS TIMESTAMPNTZ(9)) AS "UPDATED_AT",
      ROW_NUMBER() OVER (PARTITION BY "ID" ORDER BY "ID") AS _row_number
    FROM (
      SELECT
        CAST("ID" AS DECIMAL(38, 0)) AS "ID",
        CAST("VALUE" AS DECIMAL(38, 0)) AS "VALUE",
        CAST("UPDATED_AT" AS TIMESTAMPNTZ(9)) AS "UPDATED_AT"
      FROM "SOURCE_DB"."sqlmesh__PLAYGROUND"."__temp_PLAYGROUND__TEST__2041369271_yybwmep4"
    ) AS "raw_source"
  ) AS _t
  WHERE
    _row_number = 1
), "static" AS (
  SELECT
    "ID",
    "VALUE",
    "UPDATED_AT",
    "VALID_FROM",
    "VALID_TO",
    TRUE AS "_exists"
  FROM "SOURCE_DB"."sqlmesh__PLAYGROUND"."PLAYGROUND__TEST__2041369271"
  WHERE
    NOT "VALID_TO" IS NULL
), "latest" AS (
  SELECT
    "ID",
    "VALUE",
    "UPDATED_AT",
    "VALID_FROM",
    "VALID_TO",
    TRUE AS "_exists"
  FROM "SOURCE_DB"."sqlmesh__PLAYGROUND"."PLAYGROUND__TEST__2041369271"
  WHERE
    "VALID_TO" IS NULL
), "deleted" AS (
  SELECT
    "static"."ID",
    "static"."VALUE",
    "static"."UPDATED_AT",
    "static"."VALID_FROM",
    "static"."VALID_TO"
  FROM "static"
  LEFT JOIN "latest"
    ON "static"."ID" = "latest"."ID"
  WHERE
    "latest"."VALID_TO" IS NULL
), "latest_deleted" AS (
  SELECT
    TRUE AS "_exists",
    "ID" AS "_key0",
    MAX("VALID_TO") AS "VALID_TO"
  FROM "deleted"
  GROUP BY
    "ID"
), "joined" AS (
  SELECT
    "source"."_exists" AS "_exists",
    "latest"."ID" AS "t_ID",
    "latest"."VALUE" AS "t_VALUE",
    "latest"."UPDATED_AT" AS "t_UPDATED_AT",
    "latest"."VALID_FROM" AS "t_VALID_FROM",
    "latest"."VALID_TO" AS "t_VALID_TO",
    "source"."ID" AS "ID",
    "source"."VALUE" AS "VALUE",
    "source"."UPDATED_AT" AS "UPDATED_AT"
  FROM "latest"
  LEFT JOIN "source"
    ON "latest"."ID" = "source"."ID"
  UNION ALL
  SELECT
    "source"."_exists" AS "_exists",
    "latest"."ID" AS "t_ID",
    "latest"."VALUE" AS "t_VALUE",
    "latest"."UPDATED_AT" AS "t_UPDATED_AT",
    "latest"."VALID_FROM" AS "t_VALID_FROM",
    "latest"."VALID_TO" AS "t_VALID_TO",
    "source"."ID" AS "ID",
    "source"."VALUE" AS "VALUE",
    "source"."UPDATED_AT" AS "UPDATED_AT"
  FROM "latest"
  RIGHT JOIN "source"
    ON "latest"."ID" = "source"."ID"
  WHERE
    "latest"."_exists" IS NULL
), "updated_rows" AS (
  SELECT
    COALESCE("joined"."t_ID", "joined"."ID") AS "ID",
    COALESCE("joined"."t_VALUE", "joined"."VALUE") AS "VALUE",
    COALESCE("joined"."t_UPDATED_AT", "joined"."UPDATED_AT") AS "UPDATED_AT",
    CASE
      WHEN "t_VALID_FROM" IS NULL AND NOT "latest_deleted"."_exists" IS NULL
      THEN CASE
        WHEN "latest_deleted"."VALID_TO" > "UPDATED_AT"
        THEN "latest_deleted"."VALID_TO"
        ELSE "UPDATED_AT"
      END
      WHEN "t_VALID_FROM" IS NULL
      THEN CAST('1970-01-01 00:00:00' AS TIMESTAMPNTZ(9))
      ELSE "t_VALID_FROM"
    END AS "VALID_FROM",
    CASE
      WHEN "joined"."UPDATED_AT" > "joined"."t_UPDATED_AT"
      THEN "joined"."UPDATED_AT"
      ELSE "t_VALID_TO"
    END AS "VALID_TO"
  FROM "joined"
  LEFT JOIN "latest_deleted"
    ON "joined"."ID" = "latest_deleted"."_key0"
), "inserted_rows" AS (
  SELECT
    "ID",
    "VALUE",
    "UPDATED_AT",
    "UPDATED_AT" AS "VALID_FROM",
    CAST(NULL AS TIMESTAMPNTZ(9)) AS "VALID_TO"
  FROM "joined"
  WHERE
    "joined"."UPDATED_AT" > "joined"."t_UPDATED_AT"
)
SELECT
  CAST("ID" AS DECIMAL(38, 0)) AS "ID",
  CAST("VALUE" AS DECIMAL(38, 0)) AS "VALUE",
  CAST("UPDATED_AT" AS TIMESTAMPNTZ(9)) AS "UPDATED_AT",
  CAST("VALID_FROM" AS TIMESTAMPNTZ(9)) AS "VALID_FROM",
  CAST("VALID_TO" AS TIMESTAMPNTZ(9)) AS "VALID_TO"
FROM (
  SELECT
    "ID",
    "VALUE",
    "UPDATED_AT",
    "VALID_FROM",
    "VALID_TO"
  FROM "static"
  UNION ALL
  SELECT
    "ID",
    "VALUE",
    "UPDATED_AT",
    "VALID_FROM",
    "VALID_TO"
  FROM "updated_rows"
  UNION ALL
  SELECT
    "ID",
    "VALUE",
    "UPDATED_AT",
    "VALID_FROM",
    "VALID_TO"
  FROM "inserted_rows"
) AS "_subquery"

I noticed that the query tries to do the following things:

  1. Deduplicate Source Data:
  2. Separate Historical and Active Records
  3. Identify Deleted Records
  4. Track Latest Deletions.
  5. Merge Source and Existing Data.
  6. Update Existing Records.
  7. Insert New Records.
  8. The final SELECT combines:
    • Historical records (static).
    • Updated records (updated_rows).
    • Newly inserted records (inserted_rows).

However, I think this way of upserting (update and insert) data is not efficient. This way, no matter how small the data to be inserted to the table, we will always rebuild the table, which is very slow when the table is big.

A better way of doing the same operations might be to use MERGE INTO command instead to avoid union with static data in the table, as follows (might not be accurate or correct, but just giving some ideas):

MERGE INTO "SOURCE_DB"."sqlmesh__PLAYGROUND"."PLAYGROUND__TEST__2041369271" AS target
USING (
  SELECT
    "ID",
    "VALUE",
    "UPDATED_AT",
    "VALID_FROM",
    "VALID_TO"
  FROM "updated_rows"
  UNION ALL
  SELECT
    "ID",
    "VALUE",
    "UPDATED_AT",
    "VALID_FROM",
    "VALID_TO"
  FROM "inserted_rows"
) AS source
ON target."ID" = source."ID"
WHEN MATCHED THEN
  UPDATE SET
    target."VALUE" = source."VALUE",
    target."UPDATED_AT" = source."UPDATED_AT",
    target."VALID_FROM" = source."VALID_FROM",
    target."VALID_TO" = source."VALID_TO"
WHEN NOT MATCHED THEN
  INSERT ("ID", "VALUE", "UPDATED_AT", "VALID_FROM", "VALID_TO")
  VALUES (source."ID", source."VALUE", source."UPDATED_AT", source."VALID_FROM", source."VALID_TO");

Metadata

Metadata

Assignees

No one assigned

    Labels

    Engine: SnowflakeIssues related to SnowflakeImprovementImproves existing functionality

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions