-
Notifications
You must be signed in to change notification settings - Fork 243
Open
Labels
Engine: SnowflakeIssues related to SnowflakeIssues related to SnowflakeImprovementImproves existing functionalityImproves existing functionality
Description
I have a SCD Type 2
model written in Snowpark as follows:
from datetime import datetime
from snowflake.snowpark import functions as F
from snowflake.snowpark.dataframe import DataFrame
from sqlmesh import ExecutionContext
from sqlmesh import model
from sqlmesh.core.model.kind import ModelKindName
@model(
name="playground.test",
kind={
"name": ModelKindName.SCD_TYPE_2_BY_TIME,
"unique_key": ["id"],
"disable_restatement": False,
},
start="2023-01-01",
columns={
"id": "int",
"value": "int",
"updated_at": "timestamp",
"valid_from": "timestamp",
"valid_to": "timestamp",
},
interval_unit="day",
)
def execute(
context: ExecutionContext,
start: datetime,
end: datetime,
**kwargs,
) -> DataFrame:
context.var
# Upstream models (including external tables)
test_source_table = context.resolve_table("playground.test_source")
test_source = context.snowpark.table(test_source_table)
# Filter to only include latest data by using updated_at column
test = test_source.filter(F.col("updated_at").between(start, end))
test = test.select(
F.col("id"),
F.col("value"),
F.to_date(F.lit("2025-05-16")).as_("updated_at"),
)
return test
This model compiles to this SQL query when executed:
CREATE OR REPLACE TABLE "SOURCE_DB"."sqlmesh__PLAYGROUND"."PLAYGROUND__TEST__2041369271" AS
WITH "source" AS (
SELECT
"_exists",
"ID",
"VALUE",
"UPDATED_AT"
FROM (
SELECT
TRUE AS "_exists",
"ID" AS "ID",
"VALUE" AS "VALUE",
CAST("UPDATED_AT" AS TIMESTAMPNTZ(9)) AS "UPDATED_AT",
ROW_NUMBER() OVER (PARTITION BY "ID" ORDER BY "ID") AS _row_number
FROM (
SELECT
CAST("ID" AS DECIMAL(38, 0)) AS "ID",
CAST("VALUE" AS DECIMAL(38, 0)) AS "VALUE",
CAST("UPDATED_AT" AS TIMESTAMPNTZ(9)) AS "UPDATED_AT"
FROM "SOURCE_DB"."sqlmesh__PLAYGROUND"."__temp_PLAYGROUND__TEST__2041369271_yybwmep4"
) AS "raw_source"
) AS _t
WHERE
_row_number = 1
), "static" AS (
SELECT
"ID",
"VALUE",
"UPDATED_AT",
"VALID_FROM",
"VALID_TO",
TRUE AS "_exists"
FROM "SOURCE_DB"."sqlmesh__PLAYGROUND"."PLAYGROUND__TEST__2041369271"
WHERE
NOT "VALID_TO" IS NULL
), "latest" AS (
SELECT
"ID",
"VALUE",
"UPDATED_AT",
"VALID_FROM",
"VALID_TO",
TRUE AS "_exists"
FROM "SOURCE_DB"."sqlmesh__PLAYGROUND"."PLAYGROUND__TEST__2041369271"
WHERE
"VALID_TO" IS NULL
), "deleted" AS (
SELECT
"static"."ID",
"static"."VALUE",
"static"."UPDATED_AT",
"static"."VALID_FROM",
"static"."VALID_TO"
FROM "static"
LEFT JOIN "latest"
ON "static"."ID" = "latest"."ID"
WHERE
"latest"."VALID_TO" IS NULL
), "latest_deleted" AS (
SELECT
TRUE AS "_exists",
"ID" AS "_key0",
MAX("VALID_TO") AS "VALID_TO"
FROM "deleted"
GROUP BY
"ID"
), "joined" AS (
SELECT
"source"."_exists" AS "_exists",
"latest"."ID" AS "t_ID",
"latest"."VALUE" AS "t_VALUE",
"latest"."UPDATED_AT" AS "t_UPDATED_AT",
"latest"."VALID_FROM" AS "t_VALID_FROM",
"latest"."VALID_TO" AS "t_VALID_TO",
"source"."ID" AS "ID",
"source"."VALUE" AS "VALUE",
"source"."UPDATED_AT" AS "UPDATED_AT"
FROM "latest"
LEFT JOIN "source"
ON "latest"."ID" = "source"."ID"
UNION ALL
SELECT
"source"."_exists" AS "_exists",
"latest"."ID" AS "t_ID",
"latest"."VALUE" AS "t_VALUE",
"latest"."UPDATED_AT" AS "t_UPDATED_AT",
"latest"."VALID_FROM" AS "t_VALID_FROM",
"latest"."VALID_TO" AS "t_VALID_TO",
"source"."ID" AS "ID",
"source"."VALUE" AS "VALUE",
"source"."UPDATED_AT" AS "UPDATED_AT"
FROM "latest"
RIGHT JOIN "source"
ON "latest"."ID" = "source"."ID"
WHERE
"latest"."_exists" IS NULL
), "updated_rows" AS (
SELECT
COALESCE("joined"."t_ID", "joined"."ID") AS "ID",
COALESCE("joined"."t_VALUE", "joined"."VALUE") AS "VALUE",
COALESCE("joined"."t_UPDATED_AT", "joined"."UPDATED_AT") AS "UPDATED_AT",
CASE
WHEN "t_VALID_FROM" IS NULL AND NOT "latest_deleted"."_exists" IS NULL
THEN CASE
WHEN "latest_deleted"."VALID_TO" > "UPDATED_AT"
THEN "latest_deleted"."VALID_TO"
ELSE "UPDATED_AT"
END
WHEN "t_VALID_FROM" IS NULL
THEN CAST('1970-01-01 00:00:00' AS TIMESTAMPNTZ(9))
ELSE "t_VALID_FROM"
END AS "VALID_FROM",
CASE
WHEN "joined"."UPDATED_AT" > "joined"."t_UPDATED_AT"
THEN "joined"."UPDATED_AT"
ELSE "t_VALID_TO"
END AS "VALID_TO"
FROM "joined"
LEFT JOIN "latest_deleted"
ON "joined"."ID" = "latest_deleted"."_key0"
), "inserted_rows" AS (
SELECT
"ID",
"VALUE",
"UPDATED_AT",
"UPDATED_AT" AS "VALID_FROM",
CAST(NULL AS TIMESTAMPNTZ(9)) AS "VALID_TO"
FROM "joined"
WHERE
"joined"."UPDATED_AT" > "joined"."t_UPDATED_AT"
)
SELECT
CAST("ID" AS DECIMAL(38, 0)) AS "ID",
CAST("VALUE" AS DECIMAL(38, 0)) AS "VALUE",
CAST("UPDATED_AT" AS TIMESTAMPNTZ(9)) AS "UPDATED_AT",
CAST("VALID_FROM" AS TIMESTAMPNTZ(9)) AS "VALID_FROM",
CAST("VALID_TO" AS TIMESTAMPNTZ(9)) AS "VALID_TO"
FROM (
SELECT
"ID",
"VALUE",
"UPDATED_AT",
"VALID_FROM",
"VALID_TO"
FROM "static"
UNION ALL
SELECT
"ID",
"VALUE",
"UPDATED_AT",
"VALID_FROM",
"VALID_TO"
FROM "updated_rows"
UNION ALL
SELECT
"ID",
"VALUE",
"UPDATED_AT",
"VALID_FROM",
"VALID_TO"
FROM "inserted_rows"
) AS "_subquery"
I noticed that the query tries to do the following things:
- Deduplicate Source Data:
- Separate Historical and Active Records
- Identify Deleted Records
- Track Latest Deletions.
- Merge Source and Existing Data.
- Update Existing Records.
- Insert New Records.
- The final SELECT combines:
- Historical records (static).
- Updated records (updated_rows).
- Newly inserted records (inserted_rows).
However, I think this way of upserting (update and insert) data is not efficient. This way, no matter how small the data to be inserted to the table, we will always rebuild the table, which is very slow when the table is big.
A better way of doing the same operations might be to use MERGE INTO
command instead to avoid union with static
data in the table, as follows (might not be accurate or correct, but just giving some ideas):
MERGE INTO "SOURCE_DB"."sqlmesh__PLAYGROUND"."PLAYGROUND__TEST__2041369271" AS target
USING (
SELECT
"ID",
"VALUE",
"UPDATED_AT",
"VALID_FROM",
"VALID_TO"
FROM "updated_rows"
UNION ALL
SELECT
"ID",
"VALUE",
"UPDATED_AT",
"VALID_FROM",
"VALID_TO"
FROM "inserted_rows"
) AS source
ON target."ID" = source."ID"
WHEN MATCHED THEN
UPDATE SET
target."VALUE" = source."VALUE",
target."UPDATED_AT" = source."UPDATED_AT",
target."VALID_FROM" = source."VALID_FROM",
target."VALID_TO" = source."VALID_TO"
WHEN NOT MATCHED THEN
INSERT ("ID", "VALUE", "UPDATED_AT", "VALID_FROM", "VALID_TO")
VALUES (source."ID", source."VALUE", source."UPDATED_AT", source."VALID_FROM", source."VALID_TO");
Metadata
Metadata
Assignees
Labels
Engine: SnowflakeIssues related to SnowflakeIssues related to SnowflakeImprovementImproves existing functionalityImproves existing functionality