Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 14 additions & 8 deletions sqlmesh/core/snapshot/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -2300,13 +2300,19 @@ def insert(
render_kwargs: t.Dict[str, t.Any],
**kwargs: t.Any,
) -> None:
snapshot = kwargs["snapshot"]
# We should recreate MVs across supported engines (Snowflake, BigQuery etc) because
# if upstream tables were recreated (e.g FULL models), the MVs would be silently invalidated.
# The only exception to that rule is RisingWave which doesn't support CREATE OR REPLACE, so upstream
# models don't recreate their physical tables for the MVs to be invalidated.
# However, even for RW we still want to recreate MVs to avoid stale references, as is the case with normal views.
# The flag is_first_insert is used for that matter as a signal to recreate the MV if the snapshot's intervals
# have been cleared by `should_force_rebuild`
is_materialized_view = self._is_materialized_view(model)
must_recreate_view = not self.adapter.HAS_VIEW_BINDING or (
is_materialized_view and is_first_insert
)

if (
not snapshot.is_materialized_view
and self.adapter.HAS_VIEW_BINDING
and self.adapter.table_exists(table_name)
):
if self.adapter.table_exists(table_name) and not must_recreate_view:
logger.info("Skipping creation of the view '%s'", table_name)
return

Expand All @@ -2315,8 +2321,8 @@ def insert(
table_name,
query_or_df,
model.columns_to_types,
replace=not self.adapter.HAS_VIEW_BINDING,
materialized=self._is_materialized_view(model),
replace=must_recreate_view,
materialized=is_materialized_view,
view_properties=kwargs.get("physical_properties", model.physical_properties),
table_description=model.description,
column_descriptions=model.column_descriptions,
Expand Down
65 changes: 65 additions & 0 deletions tests/core/engine_adapter/integration/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@
import typing as t
import shutil
from datetime import datetime, timedelta, date
from unittest import mock
from unittest.mock import patch
import logging

import numpy as np # noqa: TID253
import pandas as pd # noqa: TID253
import pytest
Expand Down Expand Up @@ -3748,3 +3751,65 @@ def _set_config(gateway: str, config: Config) -> None:
"incremental_model",
"seed_model",
]


def test_materialized_view_evaluation(ctx: TestContext, mocker: MockerFixture):
adapter = ctx.engine_adapter
dialect = ctx.dialect

if not adapter.SUPPORTS_MATERIALIZED_VIEWS:
pytest.skip(f"Skipping engine {dialect} as it does not support materialized views")
elif dialect in ("snowflake", "databricks"):
pytest.skip(f"Skipping {dialect} as they're not enabled on standard accounts")

model_name = ctx.table("test_tbl")
mview_name = ctx.table("test_mview")

sqlmesh = ctx.create_context()

sqlmesh.upsert_model(
load_sql_based_model(
d.parse(
f"""
MODEL (name {model_name}, kind FULL);

SELECT 1 AS col
"""
)
)
)

sqlmesh.upsert_model(
load_sql_based_model(
d.parse(
f"""
MODEL (name {mview_name}, kind VIEW (materialized true));

SELECT * FROM {model_name}
"""
)
)
)

def _assert_mview_value(value: int):
df = adapter.fetchdf(f"SELECT * FROM {mview_name.sql(dialect=dialect)}")
assert df["col"][0] == value

# Case 1: Ensure that plan is successful and we can query the materialized view
sqlmesh.plan(auto_apply=True, no_prompts=True)

_assert_mview_value(value=1)

# Case 2: Ensure that we can change the underlying table and the materialized view is recreated
sqlmesh.upsert_model(
load_sql_based_model(d.parse(f"""MODEL (name {model_name}, kind FULL); SELECT 2 AS col"""))
)

logger = logging.getLogger("sqlmesh.core.snapshot.evaluator")

with mock.patch.object(logger, "info") as mock_logger:
sqlmesh.plan(auto_apply=True, no_prompts=True)

assert any("Replacing view" in call[0][0] for call in mock_logger.call_args_list)

_assert_mview_value(value=2)
47 changes: 0 additions & 47 deletions tests/core/engine_adapter/integration/test_integration_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -441,53 +441,6 @@ def test_table_diff_table_name_matches_column_name(ctx: TestContext):
assert row_diff.full_match_count == 1


def test_materialized_view_evaluation(ctx: TestContext, engine_adapter: BigQueryEngineAdapter):
model_name = ctx.table("test_tbl")
mview_name = ctx.table("test_mview")

sqlmesh = ctx.create_context()

sqlmesh.upsert_model(
load_sql_based_model(
d.parse(
f"""
MODEL (name {model_name}, kind FULL);

SELECT 1 AS col
"""
)
)
)

sqlmesh.upsert_model(
load_sql_based_model(
d.parse(
f"""
MODEL (name {mview_name}, kind VIEW (materialized true));

SELECT * FROM {model_name}
"""
)
)
)

# Case 1: Ensure that plan is successful and we can query the materialized view
sqlmesh.plan(auto_apply=True, no_prompts=True)

df = engine_adapter.fetchdf(f"SELECT * FROM {mview_name.sql(dialect=ctx.dialect)}")
assert df["col"][0] == 1

# Case 2: Ensure that we can change the underlying table and the materialized view is recreated
sqlmesh.upsert_model(
load_sql_based_model(d.parse(f"""MODEL (name {model_name}, kind FULL); SELECT 2 AS col"""))
)

sqlmesh.plan(auto_apply=True, no_prompts=True)

df = engine_adapter.fetchdf(f"SELECT * FROM {mview_name.sql(dialect=ctx.dialect)}")
assert df["col"][0] == 2


def test_correlation_id_in_job_labels(ctx: TestContext):
model_name = ctx.table("test")

Expand Down