Skip to content

Comments

Fix(risingwave): Recreate materialized views#5195

Merged
VaggelisD merged 1 commit intomainfrom
vaggelisd/mview_replace
Aug 28, 2025
Merged

Fix(risingwave): Recreate materialized views#5195
VaggelisD merged 1 commit intomainfrom
vaggelisd/mview_replace

Conversation

@VaggelisD
Copy link
Contributor

@VaggelisD VaggelisD commented Aug 20, 2025

Fixes #5184

@VaggelisD VaggelisD requested a review from a team August 20, 2025 16:20
@xardasos
Copy link
Contributor

Thanks @VaggelisD for looking into this! I tested your fix and the main reported issue is resolved - the plan command no longer fails. However, I wonder if we could avoid dropping and then recreating exactly the same MV for RisingWave. It seems to be a regression compared to 0.197.4. Why do we want to do it in the streaming context?

Log PR5195

2025-08-21 08:48:45,328 - MainThread - sqlmesh.core.plan.evaluator - INFO - Evaluating plan stage CreateSnapshotRecordsStage (evaluator.py:125)
2025-08-21 08:48:45,492 - MainThread - sqlmesh.core.plan.evaluator - INFO - Evaluating plan stage PhysicalLayerUpdateStage (evaluator.py:125)
2025-08-21 08:48:45,495 - ThreadPoolExecutor-1_0 - sqlmesh.core.snapshot.evaluator - INFO - Listing data objects in schema dev.sqlmesh__sqlmesh_example (evaluator.py:359)
2025-08-21 08:48:45,495 - ThreadPoolExecutor-1_1 - sqlmesh.core.snapshot.evaluator - INFO - Listing data objects in schema dev.sqlmesh__reporting (evaluator.py:359)
2025-08-21 08:48:45,496 - ThreadPoolExecutor-1_0 - sqlmesh.core.engine_adapter.base - INFO - Executing SQL: /* SQLMESH_PLAN: b6e1b4d3d7ce4d37a9d37366af1ab50b / SELECT current_catalog (base.py:2382)
2025-08-21 08:48:45,496 - ThreadPoolExecutor-1_1 - sqlmesh.core.engine_adapter.base - INFO - Executing SQL: /
SQLMESH_PLAN: b6e1b4d3d7ce4d37a9d37366af1ab50b / SELECT current_catalog (base.py:2382)
2025-08-21 08:48:45,523 - MainThread - sqlmesh.core.snapshot.evaluator - INFO - Creating schema 'dev.sqlmesh__reporting' (evaluator.py:1194)
2025-08-21 08:48:45,524 - MainThread - sqlmesh.core.engine_adapter.base - INFO - Executing SQL: /
SQLMESH_PLAN: b6e1b4d3d7ce4d37a9d37366af1ab50b / CREATE SCHEMA IF NOT EXISTS "sqlmesh__reporting" (base.py:2382)
2025-08-21 08:48:45,590 - MainThread - sqlmesh.core.snapshot.evaluator - INFO - Creating schema 'dev.sqlmesh__sqlmesh_example' (evaluator.py:1194)
2025-08-21 08:48:45,591 - MainThread - sqlmesh.core.engine_adapter.base - INFO - Executing SQL: /
SQLMESH_PLAN: b6e1b4d3d7ce4d37a9d37366af1ab50b / CREATE SCHEMA IF NOT EXISTS "sqlmesh__sqlmesh_example" (base.py:2382)
2025-08-21 08:48:45,648 - ThreadPoolExecutor-2_0 - sqlmesh.core.snapshot.evaluator - INFO - Creating table 'dev.sqlmesh__sqlmesh_example.sqlmesh_example__seed_model__3896887493' (evaluator.py:1529)
2025-08-21 08:48:45,650 - ThreadPoolExecutor-2_0 - sqlmesh.core.engine_adapter.base - INFO - Executing SQL: /
SQLMESH_PLAN: b6e1b4d3d7ce4d37a9d37366af1ab50b / CREATE TABLE IF NOT EXISTS "sqlmesh__sqlmesh_example"."sqlmesh_example__seed_model__3896887493" ("id" INT, "item_id" INT, "event_date" DATE) (base.py:2382)
2025-08-21 08:48:45,653 - ThreadPoolExecutor-2_1 - sqlmesh.core.engine_adapter.base - INFO - Executing SQL: /
SQLMESH_PLAN: b6e1b4d3d7ce4d37a9d37366af1ab50b / SELECT 1 FROM "information_schema"."tables" WHERE "table_name" = 'reporting__event_summary_tumbling__1201413493' AND "table_schema" = 'sqlmesh__reporting' (base.py:2382)
2025-08-21 08:48:45,661 - ThreadPoolExecutor-2_1 - sqlmesh.core.snapshot.evaluator - INFO - Creating view 'dev.sqlmesh__reporting.reporting__event_summary_tumbling__1201413493' (evaluator.py:2147)
2025-08-21 08:48:45,673 - ThreadPoolExecutor-2_1 - sqlmesh.core.engine_adapter.base - INFO - Executing SQL: /
SQLMESH_PLAN: b6e1b4d3d7ce4d37a9d37366af1ab50b / CREATE MATERIALIZED VIEW "sqlmesh__reporting"."reporting__event_summary_tumbling__1201413493" AS SELECT "window_start" AS "window_start", "window_end" AS "window_end", "event_type" AS "event_type", COUNT() AS "event_count", SUM("event_value") AS "total_value" FROM TUMBLE("click_events", "event_timestamp", INTERVAL '5 MINUTES') AS "_q_0" GROUP BY "window_start", "window_end", "event_type" (base.py:2382)
2025-08-21 08:48:46,131 - ThreadPoolExecutor-2_0 - sqlmesh.core.engine_adapter.base - INFO - Executing SQL: /* SQLMESH_PLAN: b6e1b4d3d7ce4d37a9d37366af1ab50b / SELECT current_catalog (base.py:2382)
2025-08-21 08:48:46,161 - ThreadPoolExecutor-2_0 - sqlmesh.core.engine_adapter.base - INFO - Executing SQL: /
SQLMESH_PLAN: b6e1b4d3d7ce4d37a9d37366af1ab50b / DELETE FROM "sqlmesh__sqlmesh_example"."sqlmesh_example__seed_model__3896887493" WHERE TRUE (base.py:2382)
2025-08-21 08:48:46,353 - ThreadPoolExecutor-2_0 - sqlmesh.core.engine_adapter.base - INFO - Executing SQL: INSERT INTO "sqlmesh__sqlmesh_example"."sqlmesh_example__seed_model__3896887493" ("id", "item_id", "event_date") SELECT CAST("id" AS INT) AS "id", CAST("item_id" AS INT) AS "item_id", CAST("event_date" AS DATE) AS "event_date" FROM (VALUES "") AS "t"("id", "item_id", "event_date") (base.py:2382)
2025-08-21 08:48:46,564 - ThreadPoolExecutor-2_2 - sqlmesh.core.snapshot.evaluator - INFO - Creating table 'dev.sqlmesh__sqlmesh_example.sqlmesh_example__incremental_model__3072704003' (evaluator.py:1529)
2025-08-21 08:48:46,567 - ThreadPoolExecutor-2_2 - sqlmesh.core.engine_adapter.base - INFO - Executing SQL: /
SQLMESH_PLAN: b6e1b4d3d7ce4d37a9d37366af1ab50b / CREATE TABLE IF NOT EXISTS "sqlmesh__sqlmesh_example"."sqlmesh_example__incremental_model__3072704003" ("id" INT, "item_id" INT, "event_date" DATE) (base.py:2382)
2025-08-21 08:48:47,078 - ThreadPoolExecutor-2_2 - sqlmesh.core.snapshot.evaluator - INFO - Dry running model 'sqlmesh_example.incremental_model' (evaluator.py:1551)
2025-08-21 08:48:47,080 - ThreadPoolExecutor-2_2 - sqlmesh.core.engine_adapter.base - INFO - Executing SQL: /
SQLMESH_PLAN: b6e1b4d3d7ce4d37a9d37366af1ab50b / SELECT "seed_model"."id" AS "id", "seed_model"."item_id" AS "item_id", "seed_model"."event_date" AS "event_date" FROM "dev"."sqlmesh__sqlmesh_example"."sqlmesh_example__seed_model__3896887493" AS "seed_model" WHERE ("seed_model"."event_date" <= CAST('1970-01-01' AS DATE) AND "seed_model"."event_date" >= CAST('1970-01-01' AS DATE)) AND FALSE LIMIT 0 (base.py:2382)
2025-08-21 08:48:47,093 - ThreadPoolExecutor-2_2 - sqlmesh.core.snapshot.evaluator - INFO - Creating table 'dev.sqlmesh__sqlmesh_example.sqlmesh_example__full_model__1860901422' (evaluator.py:1529)
2025-08-21 08:48:47,094 - ThreadPoolExecutor-2_2 - sqlmesh.core.engine_adapter.base - INFO - Executing SQL: /
SQLMESH_PLAN: b6e1b4d3d7ce4d37a9d37366af1ab50b / CREATE TABLE IF NOT EXISTS "sqlmesh__sqlmesh_example"."sqlmesh_example__full_model__1860901422" ("item_id" INT, "num_orders" BIGINT) (base.py:2382)
2025-08-21 08:48:47,388 - ThreadPoolExecutor-2_2 - sqlmesh.core.snapshot.evaluator - INFO - Dry running model 'sqlmesh_example.full_model' (evaluator.py:1551)
2025-08-21 08:48:47,389 - ThreadPoolExecutor-2_2 - sqlmesh.core.engine_adapter.base - INFO - Executing SQL: /
SQLMESH_PLAN: b6e1b4d3d7ce4d37a9d37366af1ab50b / SELECT "incremental_model"."item_id" AS "item_id", COUNT(DISTINCT "incremental_model"."id") AS "num_orders" FROM "dev"."sqlmesh__sqlmesh_example"."sqlmesh_example__incremental_model__3072704003" AS "incremental_model" WHERE FALSE GROUP BY "incremental_model"."item_id" LIMIT 0 (base.py:2382)
2025-08-21 08:48:47,395 - MainThread - sqlmesh.core.plan.evaluator - INFO - Evaluating plan stage BackfillStage (evaluator.py:125)
2025-08-21 08:48:47,502 - ThreadPoolExecutor-3_0 - sqlmesh.core.snapshot.evaluator - INFO - Evaluating snapshot SnapshotId<"dev"."reporting"."event_summary_tumbling": 3513990833> (evaluator.py:645)
2025-08-21 08:48:47,508 - ThreadPoolExecutor-3_1 - sqlmesh.core.state_sync.db.facade - INFO - Adding interval (2025-08-20 00:00:00, 2025-08-21 00:00:00) for snapshot SnapshotId<"dev"."sqlmesh_example"."seed_model": 3621500949> (facade.py:619)
2025-08-21 08:48:47,513 - ThreadPoolExecutor-3_1 - sqlmesh.core.state_sync.db.interval - INFO - Pushing intervals for snapshot SnapshotId<"dev"."sqlmesh_example"."seed_model": 3621500949> (interval.py:213)
2025-08-21 08:48:47,510 - ThreadPoolExecutor-3_0 - sqlmesh.core.snapshot.evaluator - INFO - Inserting batch (2025-08-20 00:00:00, 2025-08-21 00:00:00) into dev.sqlmesh__reporting.reporting__event_summary_tumbling__1201413493' (evaluator.py:700)
2025-08-21 08:48:47,514 - ThreadPoolExecutor-3_0 - sqlmesh.core.snapshot.evaluator - INFO - Replacing view 'dev.sqlmesh__reporting.reporting__event_summary_tumbling__1201413493' (evaluator.py:2100)
2025-08-21 08:48:47,515 - ThreadPoolExecutor-3_0 - sqlmesh.core.engine_adapter.base - INFO - Executing SQL: /
SQLMESH_PLAN: b6e1b4d3d7ce4d37a9d37366af1ab50b / DROP MATERIALIZED VIEW IF EXISTS "sqlmesh__reporting"."reporting__event_summary_tumbling__1201413493" CASCADE (base.py:2382)
2025-08-21 08:48:47,815 - ThreadPoolExecutor-3_2 - sqlmesh.core.snapshot.evaluator - INFO - Evaluating snapshot SnapshotId<"dev"."sqlmesh_example"."incremental_model": 4259597077> (evaluator.py:645)
2025-08-21 08:48:47,842 - ThreadPoolExecutor-3_2 - sqlmesh.core.snapshot.evaluator - INFO - Inserting batch (2020-01-01 00:00:00, 2025-08-21 00:00:00) into dev.sqlmesh__sqlmesh_example.sqlmesh_example__incremental_model__3072704003' (evaluator.py:700)
2025-08-21 08:48:47,845 - ThreadPoolExecutor-3_2 - sqlmesh.core.engine_adapter.base - INFO - Executing SQL: /
SQLMESH_PLAN: b6e1b4d3d7ce4d37a9d37366af1ab50b / DELETE FROM "sqlmesh__sqlmesh_example"."sqlmesh_example__incremental_model__3072704003" WHERE "event_date" BETWEEN CAST('2020-01-01' AS DATE) AND CAST('2025-08-20' AS DATE) (base.py:2382)
2025-08-21 08:48:48,136 - ThreadPoolExecutor-3_0 - sqlmesh.core.engine_adapter.base - INFO - Executing SQL: /
SQLMESH_PLAN: b6e1b4d3d7ce4d37a9d37366af1ab50b / CREATE MATERIALIZED VIEW "sqlmesh__reporting"."reporting__event_summary_tumbling__1201413493" AS SELECT "window_start" AS "window_start", "window_end" AS "window_end", "event_type" AS "event_type", COUNT() AS "event_count", SUM("event_value") AS "total_value" FROM TUMBLE("click_events", "event_timestamp", INTERVAL '5 MINUTES') AS "_q_0" GROUP BY "window_start", "window_end", "event_type" (base.py:2382)
2025-08-21 08:48:48,139 - ThreadPoolExecutor-3_2 - sqlmesh.core.engine_adapter.base - INFO - Executing SQL: /* SQLMESH_PLAN: b6e1b4d3d7ce4d37a9d37366af1ab50b / INSERT INTO "sqlmesh__sqlmesh_example"."sqlmesh_example__incremental_model__3072704003" ("id", "item_id", "event_date") SELECT "id", "item_id", "event_date" FROM (SELECT "seed_model"."id" AS "id", "seed_model"."item_id" AS "item_id", "seed_model"."event_date" AS "event_date" FROM "dev"."sqlmesh__sqlmesh_example"."sqlmesh_example__seed_model__3896887493" AS "seed_model" WHERE "seed_model"."event_date" <= CAST('2025-08-20' AS DATE) AND "seed_model"."event_date" >= CAST('2020-01-01' AS DATE)) AS "_subquery" WHERE "event_date" BETWEEN CAST('2020-01-01' AS DATE) AND CAST('2025-08-20' AS DATE) (base.py:2382)
2025-08-21 08:48:48,340 - ThreadPoolExecutor-3_2 - sqlmesh.core.state_sync.db.facade - INFO - Adding interval (2020-01-01 00:00:00, 2025-08-21 00:00:00) for snapshot SnapshotId<"dev"."sqlmesh_example"."incremental_model": 4259597077> (facade.py:619)
2025-08-21 08:48:48,341 - ThreadPoolExecutor-3_2 - sqlmesh.core.state_sync.db.interval - INFO - Pushing intervals for snapshot SnapshotId<"dev"."sqlmesh_example"."incremental_model": 4259597077> (interval.py:213)
2025-08-21 08:48:48,558 - ThreadPoolExecutor-3_2 - sqlmesh.core.snapshot.evaluator - INFO - Evaluating snapshot SnapshotId<"dev"."sqlmesh_example"."full_model": 2620392793> (evaluator.py:645)
2025-08-21 08:48:48,566 - ThreadPoolExecutor-3_2 - sqlmesh.core.snapshot.evaluator - INFO - Inserting batch (2020-01-01 00:00:00, 2025-08-21 00:00:00) into dev.sqlmesh__sqlmesh_example.sqlmesh_example__full_model__1860901422' (evaluator.py:700)
2025-08-21 08:48:48,567 - ThreadPoolExecutor-3_2 - sqlmesh.core.engine_adapter.base - INFO - Executing SQL: /
SQLMESH_PLAN: b6e1b4d3d7ce4d37a9d37366af1ab50b / SELECT current_catalog (base.py:2382)
2025-08-21 08:48:48,592 - ThreadPoolExecutor-3_2 - sqlmesh.core.engine_adapter.base - INFO - Executing SQL: /
SQLMESH_PLAN: b6e1b4d3d7ce4d37a9d37366af1ab50b / DELETE FROM "sqlmesh__sqlmesh_example"."sqlmesh_example__full_model__1860901422" WHERE TRUE (base.py:2382)
2025-08-21 08:48:48,761 - ThreadPoolExecutor-3_2 - sqlmesh.core.engine_adapter.base - INFO - Executing SQL: /
SQLMESH_PLAN: b6e1b4d3d7ce4d37a9d37366af1ab50b / INSERT INTO "sqlmesh__sqlmesh_example"."sqlmesh_example__full_model__1860901422" ("item_id", "num_orders") SELECT "incremental_model"."item_id" AS "item_id", COUNT(DISTINCT "incremental_model"."id") AS "num_orders" FROM "dev"."sqlmesh__sqlmesh_example"."sqlmesh_example__incremental_model__3072704003" AS "incremental_model" GROUP BY "incremental_model"."item_id" (base.py:2382)
2025-08-21 08:48:49,007 - event-emitter - sqlmesh.core.analytics.dispatcher - INFO - Failed to emit events: HTTPSConnectionPool(host='analytics.tobikodata.com', port=443): Max retries exceeded with url: /v1/sqlmesh/ (Caused by SSLError(SSLEOFError(8, '[SSL: UNEXPECTED_EOF_WHILE_READING] EOF occurred in violation of protocol (_ssl.c:1007)'))) (dispatcher.py:138)
2025-08-21 08:48:49,042 - ThreadPoolExecutor-3_0 - sqlmesh.core.state_sync.db.facade - INFO - Adding interval (2025-08-20 00:00:00, 2025-08-21 00:00:00) for snapshot SnapshotId<"dev"."reporting"."event_summary_tumbling": 3513990833> (facade.py:619)
2025-08-21 08:48:49,043 - ThreadPoolExecutor-3_0 - sqlmesh.core.state_sync.db.interval - INFO - Pushing intervals for snapshot SnapshotId<"dev"."reporting"."event_summary_tumbling": 3513990833> (interval.py:213)
2025-08-21 08:48:49,239 - ThreadPoolExecutor-3_2 - sqlmesh.core.snapshot.evaluator - INFO - Auditing snapshot SnapshotId<"dev"."sqlmesh_example"."full_model": 2620392793> (evaluator.py:551)
2025-08-21 08:48:49,247 - ThreadPoolExecutor-3_2 - sqlmesh.core.engine_adapter.base - INFO - Executing SQL: /
SQLMESH_PLAN: b6e1b4d3d7ce4d37a9d37366af1ab50b / SELECT "rw_columns"."name" AS "column_name", "rw_columns"."data_type" AS "data_type" FROM "rw_catalog"."rw_columns" JOIN "rw_catalog"."rw_relations" ON "rw_relations"."id" = "rw_columns"."relation_id" JOIN "rw_catalog"."rw_schemas" ON "rw_schemas"."id" = "rw_relations"."schema_id" WHERE ("rw_relations"."name" = 'sqlmesh_example__full_model__1860901422' AND "rw_columns"."name" <> '_row_id' AND "rw_columns"."name" <> '_rw_timestamp') AND "rw_schemas"."name" = 'sqlmesh__sqlmesh_example' (base.py:2382)
2025-08-21 08:48:49,277 - ThreadPoolExecutor-3_2 - sqlmesh.core.engine_adapter.base - INFO - Executing SQL: /
SQLMESH_PLAN: b6e1b4d3d7ce4d37a9d37366af1ab50b / SELECT COUNT() FROM (SELECT * FROM "dev"."sqlmesh__sqlmesh_example"."sqlmesh_example__full_model__1860901422" AS "sqlmesh_example__full_model__1860901422" WHERE "item_id" < 0) AS "audit" (base.py:2382)
2025-08-21 08:48:49,293 - ThreadPoolExecutor-3_2 - sqlmesh.core.state_sync.db.facade - INFO - Adding interval (2020-01-01 00:00:00, 2025-08-21 00:00:00) for snapshot SnapshotId<"dev"."sqlmesh_example"."full_model": 2620392793> (facade.py:619)
2025-08-21 08:48:49,293 - ThreadPoolExecutor-3_2 - sqlmesh.core.state_sync.db.interval - INFO - Pushing intervals for snapshot SnapshotId<"dev"."sqlmesh_example"."full_model": 2620392793> (interval.py:213)
2025-08-21 08:48:49,389 - MainThread - sqlmesh.core.plan.evaluator - INFO - Evaluating plan stage EnvironmentRecordUpdateStage (evaluator.py:125)
2025-08-21 08:48:49,389 - MainThread - sqlmesh.core.state_sync.db.facade - INFO - Promoting environment 'prod' (facade.py:167)
2025-08-21 08:48:49,584 - MainThread - sqlmesh.core.plan.evaluator - INFO - Evaluating plan stage MigrateSchemasStage (evaluator.py:125)
2025-08-21 08:48:49,586 - ThreadPoolExecutor-4_0 - sqlmesh.core.engine_adapter.base - INFO - Executing SQL: /* SQLMESH_PLAN: b6e1b4d3d7ce4d37a9d37366af1ab50b / SELECT current_catalog (base.py:2382)
2025-08-21 08:48:49,601 - ThreadPoolExecutor-4_0 - sqlmesh.core.snapshot.evaluator - INFO - Migrating table schema 'dev.sqlmesh__reporting.reporting__event_summary_tumbling__1201413493' to match 'dev.sqlmesh__reporting.reporting__event_summary_tumbling__1201413493__dev' (evaluator.py:946)
2025-08-21 08:48:49,601 - ThreadPoolExecutor-4_0 - sqlmesh.core.snapshot.evaluator - INFO - Migrating view 'dev.sqlmesh__reporting.reporting__event_summary_tumbling__1201413493' (evaluator.py:2177)
2025-08-21 08:48:49,612 - ThreadPoolExecutor-4_0 - sqlmesh.core.engine_adapter.base - INFO - Executing SQL: /
SQLMESH_PLAN: b6e1b4d3d7ce4d37a9d37366af1ab50b / DROP MATERIALIZED VIEW IF EXISTS "sqlmesh__reporting"."reporting__event_summary_tumbling__1201413493" CASCADE (base.py:2382)
2025-08-21 08:48:50,010 - ThreadPoolExecutor-4_0 - sqlmesh.core.engine_adapter.base - INFO - Executing SQL: /
SQLMESH_PLAN: b6e1b4d3d7ce4d37a9d37366af1ab50b / CREATE MATERIALIZED VIEW "sqlmesh__reporting"."reporting__event_summary_tumbling__1201413493" AS SELECT "window_start" AS "window_start", "window_end" AS "window_end", "event_type" AS "event_type", COUNT() AS "event_count", SUM("event_value") AS "total_value" FROM TUMBLE("click_events", "event_timestamp", INTERVAL '5 MINUTES') AS "_q_0" GROUP BY "window_start", "window_end", "event_type" (base.py:2382)
2025-08-21 08:48:51,411 - MainThread - sqlmesh.core.plan.evaluator - INFO - Evaluating plan stage UnpauseStage (evaluator.py:125)

In contrast, in 0.197.4, a MV is only created once.

Log 0.197.4

2025-08-21 08:59:47,841 - MainThread - sqlmesh.core.plan.evaluator - INFO - Evaluating plan stage CreateSnapshotRecordsStage (evaluator.py:115)
2025-08-21 08:59:48,060 - MainThread - sqlmesh.core.plan.evaluator - INFO - Evaluating plan stage PhysicalLayerUpdateStage (evaluator.py:115)
2025-08-21 08:59:48,075 - ThreadPoolExecutor-1_0 - sqlmesh.core.snapshot.evaluator - INFO - Listing data objects in schema dev.sqlmesh__sqlmesh_example (evaluator.py:350)
2025-08-21 08:59:48,076 - ThreadPoolExecutor-1_0 - sqlmesh.core.engine_adapter.base - INFO - Executing SQL: SELECT current_catalog (base.py:2237)
2025-08-21 08:59:48,077 - ThreadPoolExecutor-1_1 - sqlmesh.core.snapshot.evaluator - INFO - Listing data objects in schema dev.sqlmesh__reporting (evaluator.py:350)
2025-08-21 08:59:48,080 - ThreadPoolExecutor-1_1 - sqlmesh.core.engine_adapter.base - INFO - Executing SQL: SELECT current_catalog (base.py:2237)
2025-08-21 08:59:48,151 - MainThread - sqlmesh.core.snapshot.evaluator - INFO - Creating schema 'dev.sqlmesh__sqlmesh_example' (evaluator.py:1143)
2025-08-21 08:59:48,153 - MainThread - sqlmesh.core.engine_adapter.base - INFO - Executing SQL: CREATE SCHEMA IF NOT EXISTS "sqlmesh__sqlmesh_example" (base.py:2237)
2025-08-21 08:59:48,276 - MainThread - sqlmesh.core.snapshot.evaluator - INFO - Creating schema 'dev.sqlmesh__reporting' (evaluator.py:1143)
2025-08-21 08:59:48,277 - MainThread - sqlmesh.core.engine_adapter.base - INFO - Executing SQL: CREATE SCHEMA IF NOT EXISTS "sqlmesh__reporting" (base.py:2237)
2025-08-21 08:59:48,399 - ThreadPoolExecutor-2_1 - sqlmesh.core.snapshot.evaluator - INFO - Creating table 'dev.sqlmesh__sqlmesh_example.sqlmesh_example__seed_model__2185867172' (evaluator.py:1495)
2025-08-21 08:59:48,408 - ThreadPoolExecutor-2_1 - sqlmesh.core.engine_adapter.base - INFO - Executing SQL: CREATE TABLE IF NOT EXISTS "sqlmesh__sqlmesh_example"."sqlmesh_example__seed_model__2185867172" ("id" INT, "item_id" INT, "event_date" DATE) (base.py:2237)
2025-08-21 08:59:48,406 - ThreadPoolExecutor-2_0 - sqlmesh.core.engine_adapter.base - INFO - Executing SQL: SELECT 1 FROM "information_schema"."tables" WHERE "table_name" = 'reporting__event_summary_tumbling__3036268106' AND "table_schema" = 'sqlmesh__reporting' (base.py:2237)
2025-08-21 08:59:48,440 - ThreadPoolExecutor-2_0 - sqlmesh.core.snapshot.evaluator - INFO - Creating view 'dev.sqlmesh__reporting.reporting__event_summary_tumbling__3036268106' (evaluator.py:1940)
2025-08-21 08:59:48,509 - ThreadPoolExecutor-2_0 - sqlmesh.core.engine_adapter.base - INFO - Executing SQL: CREATE MATERIALIZED VIEW "sqlmesh__reporting"."reporting__event_summary_tumbling__3036268106" AS SELECT "window_start" AS "window_start", "window_end" AS "window_end", "event_type" AS "event_type", COUNT() AS "event_count", SUM("event_value") AS "total_value" FROM TUMBLE("click_events", "event_timestamp", INTERVAL '5 MINUTES') AS "_q_0" GROUP BY "window_start", "window_end", "event_type" (base.py:2237)
2025-08-21 08:59:48,591 - event-emitter - sqlmesh.core.analytics.dispatcher - INFO - Failed to emit events: HTTPSConnectionPool(host='analytics.tobikodata.com', port=443): Max retries exceeded with url: /v1/sqlmesh/ (Caused by SSLError(SSLEOFError(8, '[SSL: UNEXPECTED_EOF_WHILE_READING] EOF occurred in violation of protocol (_ssl.c:1007)'))) (dispatcher.py:138)
2025-08-21 08:59:48,817 - ThreadPoolExecutor-2_1 - sqlmesh.core.engine_adapter.base - INFO - Executing SQL: SELECT 1 FROM "information_schema"."tables" WHERE "table_name" = 'sqlmesh_example__seed_model__2185867172' AND "table_schema" = 'sqlmesh__sqlmesh_example' (base.py:2237)
2025-08-21 08:59:48,824 - ThreadPoolExecutor-2_1 - sqlmesh.core.engine_adapter.base - INFO - Executing SQL: DELETE FROM "sqlmesh__sqlmesh_example"."sqlmesh_example__seed_model__2185867172" WHERE TRUE (base.py:2237)
2025-08-21 08:59:48,965 - ThreadPoolExecutor-2_1 - sqlmesh.core.engine_adapter.base - INFO - Executing SQL: INSERT INTO "sqlmesh__sqlmesh_example"."sqlmesh_example__seed_model__2185867172" ("id", "item_id", "event_date") SELECT CAST("id" AS INT) AS "id", CAST("item_id" AS INT) AS "item_id", CAST("event_date" AS DATE) AS "event_date" FROM (VALUES "") AS "t"("id", "item_id", "event_date") (base.py:2237)
2025-08-21 08:59:49,309 - ThreadPoolExecutor-2_2 - sqlmesh.core.snapshot.evaluator - INFO - Creating table 'dev.sqlmesh__sqlmesh_example.sqlmesh_example__incremental_model__2906841977' (evaluator.py:1495)
2025-08-21 08:59:49,311 - ThreadPoolExecutor-2_2 - sqlmesh.core.engine_adapter.base - INFO - Executing SQL: CREATE TABLE IF NOT EXISTS "sqlmesh__sqlmesh_example"."sqlmesh_example__incremental_model__2906841977" ("id" INT, "item_id" INT, "event_date" DATE) (base.py:2237)
2025-08-21 08:59:50,618 - ThreadPoolExecutor-2_2 - sqlmesh.core.snapshot.evaluator - INFO - Dry running model 'sqlmesh_example.incremental_model' (evaluator.py:1517)
2025-08-21 08:59:50,619 - ThreadPoolExecutor-2_2 - sqlmesh.core.engine_adapter.base - INFO - Executing SQL: SELECT "seed_model"."id" AS "id", "seed_model"."item_id" AS "item_id", "seed_model"."event_date" AS "event_date" FROM "dev"."sqlmesh__sqlmesh_example"."sqlmesh_example__seed_model__2185867172" AS "seed_model" WHERE ("seed_model"."event_date" <= CAST('1970-01-01' AS DATE) AND "seed_model"."event_date" >= CAST('1970-01-01' AS DATE)) AND FALSE LIMIT 0 (base.py:2237)
2025-08-21 08:59:50,634 - ThreadPoolExecutor-2_2 - sqlmesh.core.snapshot.evaluator - INFO - Creating table 'dev.sqlmesh__sqlmesh_example.sqlmesh_example__full_model__3332047715' (evaluator.py:1495)
2025-08-21 08:59:50,635 - ThreadPoolExecutor-2_2 - sqlmesh.core.engine_adapter.base - INFO - Executing SQL: CREATE TABLE IF NOT EXISTS "sqlmesh__sqlmesh_example"."sqlmesh_example__full_model__3332047715" ("item_id" INT, "num_orders" BIGINT) (base.py:2237)
2025-08-21 08:59:50,950 - ThreadPoolExecutor-2_2 - sqlmesh.core.snapshot.evaluator - INFO - Dry running model 'sqlmesh_example.full_model' (evaluator.py:1517)
2025-08-21 08:59:50,951 - ThreadPoolExecutor-2_2 - sqlmesh.core.engine_adapter.base - INFO - Executing SQL: SELECT "incremental_model"."item_id" AS "item_id", COUNT(DISTINCT "incremental_model"."id") AS "num_orders" FROM "dev"."sqlmesh__sqlmesh_example"."sqlmesh_example__incremental_model__2906841977" AS "incremental_model" WHERE FALSE GROUP BY "incremental_model"."item_id" LIMIT 0 (base.py:2237)
2025-08-21 08:59:50,960 - MainThread - sqlmesh.core.plan.evaluator - INFO - Evaluating plan stage BackfillStage (evaluator.py:115)
2025-08-21 08:59:51,175 - ThreadPoolExecutor-3_0 - sqlmesh.core.snapshot.evaluator - INFO - Evaluating snapshot SnapshotId<"dev"."reporting"."event_summary_tumbling": 50710608> (evaluator.py:636)
2025-08-21 08:59:51,183 - ThreadPoolExecutor-3_1 - sqlmesh.core.state_sync.db.facade - INFO - Adding interval (2025-08-20 00:00:00, 2025-08-21 00:00:00) for snapshot SnapshotId<"dev"."sqlmesh_example"."seed_model": 3467833469> (facade.py:621)
2025-08-21 08:59:51,190 - ThreadPoolExecutor-3_1 - sqlmesh.core.state_sync.db.interval - INFO - Pushing intervals for snapshot SnapshotId<"dev"."sqlmesh_example"."seed_model": 3467833469> (interval.py:213)
2025-08-21 08:59:51,190 - ThreadPoolExecutor-3_0 - sqlmesh.core.snapshot.evaluator - INFO - Inserting batch (2025-08-20 00:00:00, 2025-08-21 00:00:00) into dev.sqlmesh__reporting.reporting__event_summary_tumbling__3036268106' (evaluator.py:690)
2025-08-21 08:59:51,223 - ThreadPoolExecutor-3_0 - sqlmesh.core.engine_adapter.base - INFO - Executing SQL: SELECT 1 FROM "information_schema"."tables" WHERE "table_name" = 'reporting__event_summary_tumbling__3036268106' AND "table_schema" = 'sqlmesh__reporting' (base.py:2237)
2025-08-21 08:59:51,239 - ThreadPoolExecutor-3_0 - sqlmesh.core.snapshot.evaluator - INFO - Skipping creation of the view 'dev.sqlmesh__reporting.reporting__event_summary_tumbling__3036268106' (evaluator.py:1891)
2025-08-21 08:59:51,240 - ThreadPoolExecutor-3_0 - sqlmesh.core.state_sync.db.facade - INFO - Adding interval (2025-08-20 00:00:00, 2025-08-21 00:00:00) for snapshot SnapshotId<"dev"."reporting"."event_summary_tumbling": 50710608> (facade.py:621)
2025-08-21 08:59:51,240 - ThreadPoolExecutor-3_0 - sqlmesh.core.state_sync.db.interval - INFO - Pushing intervals for snapshot SnapshotId<"dev"."reporting"."event_summary_tumbling": 50710608> (interval.py:213)
2025-08-21 08:59:51,490 - ThreadPoolExecutor-3_1 - sqlmesh.core.snapshot.evaluator - INFO - Evaluating snapshot SnapshotId<"dev"."sqlmesh_example"."incremental_model": 4283318177> (evaluator.py:636)
2025-08-21 08:59:51,529 - ThreadPoolExecutor-3_1 - sqlmesh.core.snapshot.evaluator - INFO - Inserting batch (2020-01-01 00:00:00, 2025-08-21 00:00:00) into dev.sqlmesh__sqlmesh_example.sqlmesh_example__incremental_model__2906841977' (evaluator.py:690)
2025-08-21 08:59:51,533 - ThreadPoolExecutor-3_1 - sqlmesh.core.engine_adapter.base - INFO - Executing SQL: DELETE FROM "sqlmesh__sqlmesh_example"."sqlmesh_example__incremental_model__2906841977" WHERE "event_date" BETWEEN CAST('2020-01-01' AS DATE) AND CAST('2025-08-20' AS DATE) (base.py:2237)
2025-08-21 08:59:51,591 - ThreadPoolExecutor-3_1 - sqlmesh.core.engine_adapter.base - INFO - Executing SQL: INSERT INTO "sqlmesh__sqlmesh_example"."sqlmesh_example__incremental_model__2906841977" ("id", "item_id", "event_date") SELECT "id", "item_id", "event_date" FROM (SELECT "seed_model"."id" AS "id", "seed_model"."item_id" AS "item_id", "seed_model"."event_date" AS "event_date" FROM "dev"."sqlmesh__sqlmesh_example"."sqlmesh_example__seed_model__2185867172" AS "seed_model" WHERE "seed_model"."event_date" <= CAST('2025-08-20' AS DATE) AND "seed_model"."event_date" >= CAST('2020-01-01' AS DATE)) AS "_subquery" WHERE "event_date" BETWEEN CAST('2020-01-01' AS DATE) AND CAST('2025-08-20' AS DATE) (base.py:2237)
2025-08-21 08:59:51,682 - ThreadPoolExecutor-3_1 - sqlmesh.core.state_sync.db.facade - INFO - Adding interval (2020-01-01 00:00:00, 2025-08-21 00:00:00) for snapshot SnapshotId<"dev"."sqlmesh_example"."incremental_model": 4283318177> (facade.py:621)
2025-08-21 08:59:51,683 - ThreadPoolExecutor-3_1 - sqlmesh.core.state_sync.db.interval - INFO - Pushing intervals for snapshot SnapshotId<"dev"."sqlmesh_example"."incremental_model": 4283318177> (interval.py:213)
2025-08-21 08:59:51,790 - ThreadPoolExecutor-3_1 - sqlmesh.core.snapshot.evaluator - INFO - Evaluating snapshot SnapshotId<"dev"."sqlmesh_example"."full_model": 237739027> (evaluator.py:636)
2025-08-21 08:59:51,818 - ThreadPoolExecutor-3_1 - sqlmesh.core.snapshot.evaluator - INFO - Inserting batch (2020-01-01 00:00:00, 2025-08-21 00:00:00) into dev.sqlmesh__sqlmesh_example.sqlmesh_example__full_model__3332047715' (evaluator.py:690)
2025-08-21 08:59:51,820 - ThreadPoolExecutor-3_1 - sqlmesh.core.engine_adapter.base - INFO - Executing SQL: SELECT 1 FROM "information_schema"."tables" WHERE "table_name" = 'sqlmesh_example__full_model__3332047715' AND "table_schema" = 'sqlmesh__sqlmesh_example' (base.py:2237)
2025-08-21 08:59:51,829 - ThreadPoolExecutor-3_1 - sqlmesh.core.engine_adapter.base - INFO - Executing SQL: DELETE FROM "sqlmesh__sqlmesh_example"."sqlmesh_example__full_model__3332047715" WHERE TRUE (base.py:2237)
2025-08-21 08:59:51,874 - ThreadPoolExecutor-3_1 - sqlmesh.core.engine_adapter.base - INFO - Executing SQL: INSERT INTO "sqlmesh__sqlmesh_example"."sqlmesh_example__full_model__3332047715" ("item_id", "num_orders") SELECT "incremental_model"."item_id" AS "item_id", COUNT(DISTINCT "incremental_model"."id") AS "num_orders" FROM "dev"."sqlmesh__sqlmesh_example"."sqlmesh_example__incremental_model__2906841977" AS "incremental_model" GROUP BY "incremental_model"."item_id" (base.py:2237)
2025-08-21 08:59:51,966 - ThreadPoolExecutor-3_1 - sqlmesh.core.snapshot.evaluator - INFO - Auditing snapshot SnapshotId<"dev"."sqlmesh_example"."full_model": 237739027> (evaluator.py:542)
2025-08-21 08:59:51,982 - ThreadPoolExecutor-3_1 - sqlmesh.core.engine_adapter.base - INFO - Executing SQL: SELECT "attname" AS "column_name", "pg_catalog".format_type("atttypid", "atttypmod") AS "data_type" FROM "pg_catalog"."pg_attribute" JOIN "pg_catalog"."pg_class" ON "pg_class"."oid" = "attrelid" JOIN "pg_catalog"."pg_namespace" ON "pg_namespace"."oid" = "relnamespace" WHERE ("attnum" > 0 AND NOT "attisdropped" AND "relname" = 'sqlmesh_example__full_model__3332047715') AND "nspname" = 'sqlmesh__sqlmesh_example' (base.py:2237)
2025-08-21 08:59:52,007 - ThreadPoolExecutor-3_1 - sqlmesh.core.engine_adapter.base - INFO - Executing SQL: SELECT COUNT(
) FROM (SELECT * FROM "dev"."sqlmesh__sqlmesh_example"."sqlmesh_example__full_model__3332047715" AS "sqlmesh_example__full_model__3332047715" WHERE "item_id" < 0) AS "audit" (base.py:2237)
2025-08-21 08:59:52,021 - ThreadPoolExecutor-3_1 - sqlmesh.core.state_sync.db.facade - INFO - Adding interval (2020-01-01 00:00:00, 2025-08-21 00:00:00) for snapshot SnapshotId<"dev"."sqlmesh_example"."full_model": 237739027> (facade.py:621)
2025-08-21 08:59:52,021 - ThreadPoolExecutor-3_1 - sqlmesh.core.state_sync.db.interval - INFO - Pushing intervals for snapshot SnapshotId<"dev"."sqlmesh_example"."full_model": 237739027> (interval.py:213)
2025-08-21 08:59:52,188 - MainThread - sqlmesh.core.plan.evaluator - INFO - Evaluating plan stage EnvironmentRecordUpdateStage (evaluator.py:115)
2025-08-21 08:59:52,189 - MainThread - sqlmesh.core.state_sync.db.facade - INFO - Promoting environment 'prod' (facade.py:169)
2025-08-21 08:59:52,566 - MainThread - sqlmesh.core.plan.evaluator - INFO - Evaluating plan stage UnpauseStage (evaluator.py:115)
2025-08-21 08:59:52,591 - MainThread - sqlmesh.core.state_sync.db.snapshot - INFO - Unpausing snapshot SnapshotId<"dev"."reporting"."event_summary_tumbling": 50710608> (snapshot.py:144)
2025-08-21 08:59:52,592 - MainThread - sqlmesh.core.state_sync.db.snapshot - INFO - Unpausing snapshot SnapshotId<"dev"."sqlmesh_example"."full_model": 237739027> (snapshot.py:144)
2025-08-21 08:59:52,598 - MainThread - sqlmesh.core.state_sync.db.snapshot - INFO - Unpausing snapshot SnapshotId<"dev"."sqlmesh_example"."seed_model": 3467833469> (snapshot.py:144)
2025-08-21 08:59:52,599 - MainThread - sqlmesh.core.state_sync.db.snapshot - INFO - Unpausing snapshot SnapshotId<"dev"."sqlmesh_example"."incremental_model": 4283318177> (snapshot.py:144)
2025-08-21 08:59:52,694 - MainThread - sqlmesh.core.plan.evaluator - INFO - Evaluating plan stage VirtualLayerUpdateStage (evaluator.py:115)
2025-08-21 08:59:52,701 - MainThread - sqlmesh.core.snapshot.evaluator - INFO - Creating schema 'dev.reporting' (evaluator.py:1143)
2025-08-21 08:59:52,703 - MainThread - sqlmesh.core.engine_adapter.base - INFO - Executing SQL: CREATE SCHEMA IF NOT EXISTS "reporting" (base.py:2237)
2025-08-21 08:59:52,769 - MainThread - sqlmesh.core.snapshot.evaluator - INFO - Creating schema 'dev.sqlmesh_example' (evaluator.py:1143)
2025-08-21 08:59:52,770 - MainThread - sqlmesh.core.engine_adapter.base - INFO - Executing SQL: CREATE SCHEMA IF NOT EXISTS "sqlmesh_example" (base.py:2237)
2025-08-21 08:59:52,832 - ThreadPoolExecutor-4_0 - sqlmesh.core.snapshot.evaluator - INFO - Updating view 'dev.sqlmesh_example.seed_model' to point at table 'dev.sqlmesh__sqlmesh_example.sqlmesh_example__seed_model__2185867172' (evaluator.py:1450)
2025-08-21 08:59:52,835 - ThreadPoolExecutor-4_0 - sqlmesh.core.engine_adapter.base - INFO - Executing SQL: DROP VIEW IF EXISTS "sqlmesh_example"."seed_model" CASCADE (base.py:2237)
2025-08-21 08:59:52,839 - ThreadPoolExecutor-4_1 - sqlmesh.core.snapshot.evaluator - INFO - Updating view 'dev.reporting.event_summary_tumbling' to point at table 'dev.sqlmesh__reporting.reporting__event_summary_tumbling__3036268106' (evaluator.py:1450)
2025-08-21 08:59:52,842 - ThreadPoolExecutor-4_1 - sqlmesh.core.engine_adapter.base - INFO - Executing SQL: DROP VIEW IF EXISTS "reporting"."event_summary_tumbling" CASCADE (base.py:2237)
2025-08-21 08:59:52,844 - ThreadPoolExecutor-4_0 - sqlmesh.core.engine_adapter.base - INFO - Executing SQL: CREATE VIEW "sqlmesh_example"."seed_model" AS SELECT * FROM "dev"."sqlmesh__sqlmesh_example"."sqlmesh_example__seed_model__2185867172" (base.py:2237)
2025-08-21 08:59:52,851 - ThreadPoolExecutor-4_1 - sqlmesh.core.engine_adapter.base - INFO - Executing SQL: CREATE VIEW "reporting"."event_summary_tumbling" AS SELECT * FROM "dev"."sqlmesh__reporting"."reporting__event_summary_tumbling__3036268106" (base.py:2237)
2025-08-21 08:59:53,315 - ThreadPoolExecutor-4_0 - sqlmesh.core.snapshot.evaluator - INFO - Updating view 'dev.sqlmesh_example.incremental_model' to point at table 'dev.sqlmesh__sqlmesh_example.sqlmesh_example__incremental_model__2906841977' (evaluator.py:1450)
2025-08-21 08:59:53,325 - ThreadPoolExecutor-4_0 - sqlmesh.core.engine_adapter.base - INFO - Executing SQL: DROP VIEW IF EXISTS "sqlmesh_example"."incremental_model" CASCADE (base.py:2237)
2025-08-21 08:59:53,326 - ThreadPoolExecutor-4_0 - sqlmesh.core.engine_adapter.base - INFO - Executing SQL: CREATE VIEW "sqlmesh_example"."incremental_model" AS SELECT * FROM "dev"."sqlmesh__sqlmesh_example"."sqlmesh_example__incremental_model__2906841977" (base.py:2237)
2025-08-21 08:59:53,415 - ThreadPoolExecutor-4_0 - sqlmesh.core.snapshot.evaluator - INFO - Updating view 'dev.sqlmesh_example.full_model' to point at table 'dev.sqlmesh__sqlmesh_example.sqlmesh_example__full_model__3332047715' (evaluator.py:1450)
2025-08-21 08:59:53,417 - ThreadPoolExecutor-4_0 - sqlmesh.core.engine_adapter.base - INFO - Executing SQL: DROP VIEW IF EXISTS "sqlmesh_example"."full_model" CASCADE (base.py:2237)
2025-08-21 08:59:53,419 - ThreadPoolExecutor-4_0 - sqlmesh.core.engine_adapter.base - INFO - Executing SQL: CREATE VIEW "sqlmesh_example"."full_model" AS SELECT * FROM "dev"."sqlmesh__sqlmesh_example"."sqlmesh_example__full_model__3332047715" (base.py:2237)
2025-08-21 08:59:53,489 - MainThread - sqlmesh.core.plan.evaluator - INFO - Evaluating plan stage FinalizeEnvironmentStage (evaluator.py:115)
2025-08-21 08:59:53,490 - MainThread - sqlmesh.core.state_sync.db.environment - INFO - Finalizing environment 'prod' (environment.py:139)
2025-08-21 08:59:53,618 - MainThread - root - INFO - Shutting down the event dispatcher (dispatcher.py:159)

@VaggelisD
Copy link
Contributor Author

VaggelisD commented Aug 21, 2025

Hey @xardasos, thank you for confirming. Check out the explanation in the initial PR, the solution here was intended to cover all the other dialects.

However, I tested the problematic scenario (i.e dropping the underlying table, which invalidates the mview in other engines) and that doesn't seem to be allowed in RisingWave, e.g:

dev=> create table bar as (select 1 as col);
INSERT 0 1

dev=> create materialized view foo as (select * from bar);
CREATE_MATERIALIZED_VIEW

dev=> drop table bar;
ERROR:  Failed to run the query. Caused by: Permission denied:  table used by 1 other objects.

dev=> CREATE OR REPLACE TABLE bar AS (SELECT 2 AS col);
ERROR:  Failed to run the query Caused by: Feature is not yet implemented: CREATE OR REPLACE TABLE

Thus, the reason RisingWave never had an issue with materialized views is because even for FULL models we can't replace the table, so instead we end up deleting and reinserting all the rows, meaning that the mview won't be invalidated.

I can push a new fix to not recreate the views in RW and you can give it a shot if you want.

@VaggelisD VaggelisD force-pushed the vaggelisd/mview_replace branch from 30882da to 636c1db Compare August 21, 2025 10:17
@xardasos
Copy link
Contributor

Thanks @VaggelisD for explaining. The newest version seems to be working correctly, nice!

@VaggelisD VaggelisD force-pushed the vaggelisd/mview_replace branch 3 times, most recently from f56d1fd to b0ea051 Compare August 21, 2025 14:12
@VaggelisD VaggelisD changed the title Fix: Materialized views not being recreated Fix(risingwave): Do not always recreate materialized views Aug 21, 2025
@xardasos
Copy link
Contributor

xardasos commented Aug 21, 2025

@VaggelisD I discovered one more problem in the current version. In case of indirect non-breaking changes, the MVs are not being recreated when it might be necessary.

Let's assume we have :
MV A(id, amount)
MV B(id). B is defined as select id from A.
Assume that these MVs have fingerprints A_123 and B_567 (B_567 consumes from A_123).
Suppose we add a new column amount2 to A and execute plan. Such change would be classified as direct non-breaking for A and indirect non-breaking for B. sqlmesh creates MV A_124.

In 0.197.4 sqlmesh also drops B_567 and recreates it so that it now consumes from A_124 instead of A_123.

In this new version this recreation doesn't happen. This can cause issues when the previous version A_123 gets finally cascade deleted (B_567 still consumes from A_123).

@VaggelisD
Copy link
Contributor Author

VaggelisD commented Aug 21, 2025

@xardasos This is expected afaict, the indirect non-breaking changes don't trigger a backfill. Do you mean that A_127 would bring new data in id and since B_567 is not being refreshed, that would be left with "stale" data?

In any event, if the optimization here is flaky I think it'd be best if I revert it and instead force the RisingWave adapter to always recreate the views.

@xardasos
Copy link
Contributor

xardasos commented Aug 22, 2025

@VaggelisD, if we don't recreate B, then MV B_567 continues to be physically attached to A_123 (B_567 is defined in the DB as select id from A_123 - RW MVs are tightly coupled) while sqlmesh virtual view A points to the new MV A_124. Eventually, we want to get rid of the old MV A_123 in favor of A_124 (I guess we don't want to keep both versions A_123 and A_124 forever). However, if we do this, then B_567 and sqlmesh virtual view B get deleted as well due to the tight coupling and the drop cascading effect which probably is not what we want.

I think we should keep the optimization in the most basic and common workflow (adding a new MV) and at least not make things worse compared to 0.197.4. Having to create each MV twice (when it is not necessary), in my opinion, is a huge performance blow, as it sometimes takes hours or even days to create a big MV.

However, after investigating this a bit more I think that your changes are now correct. I'm not 100% sure, but I now suspect that the reason why the recreation no longer happens could be the very recent changes from #5189. It looks to me that one problem with these changes is that now requires_schema_migration_in_prod is evaluated based on is_materialized https://github.com/TobikoData/sqlmesh/pull/5189/files#diff-71500552b618c1153561f3878bf71b50e0e98c6871b5770c2fa0902c2e6dd89fR1388 and this function returns false for materialized views https://github.com/TobikoData/sqlmesh/blob/main/sqlmesh/core/model/kind.py#L130 (because materialized views are views) which looks odd. Perhaps MVs were not taken into account while working on #5189? /cc @izeigerman

(I'll be away until 07.09 and may not be able to reply during this time.)

@VaggelisD
Copy link
Contributor Author

Hey @xardasos, with that in mind I think I'll reenable the force recreation since the MV will either (1) be replaced due to a breaking/indirect breaking change or (2) it should be replaced in indirect non breaking to avoid stale references, meaning that it should probably always be replaced.

If you can optimize this further it'd be great if you can contribute a well-tested PR with your additional usecases.

@VaggelisD VaggelisD changed the title Fix(risingwave): Do not always recreate materialized views Fix(risingwave): Recreate materialized views Aug 25, 2025
@VaggelisD VaggelisD force-pushed the vaggelisd/mview_replace branch from 4424859 to 147d85d Compare August 26, 2025 08:39
@VaggelisD VaggelisD force-pushed the vaggelisd/mview_replace branch from 147d85d to ff2e7f5 Compare August 28, 2025 08:45
@VaggelisD VaggelisD force-pushed the vaggelisd/mview_replace branch from ff2e7f5 to c42ced4 Compare August 28, 2025 11:31
@VaggelisD VaggelisD merged commit 4c94ee1 into main Aug 28, 2025
36 checks passed
@VaggelisD VaggelisD deleted the vaggelisd/mview_replace branch August 28, 2025 14:30
@xardasos
Copy link
Contributor

xardasos commented Sep 9, 2025

Hey @VaggelisD, thanks again for this fix, it is definitely a step in the right direction. However, I think there is still a regression compared to the version 0.197.4, where MVs are created only once in basic scenarios (e.g. adding a new MV) and the indirect non breaking changes are handled correctly. I have created a follow-up issue #5331.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

RisingWave: SQLMesh plan fails due to an attempt to create the same MV twice

5 participants