Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 5 additions & 13 deletions sqlmesh/core/snapshot/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -1876,20 +1876,12 @@ def insert(
)
snapshot = kwargs["snapshot"]
snapshots = kwargs["snapshots"]

if (
(
isinstance(query_or_df, exp.Expression)
and snapshot.is_materialized_view
and deployability_index.is_deployable(snapshot)
and model.render_query(
snapshots=snapshots,
deployability_index=deployability_index,
engine_adapter=self.adapter,
)
== query_or_df
)
or self.adapter.HAS_VIEW_BINDING
) and self.adapter.table_exists(table_name):
not snapshot.is_materialized_view
and self.adapter.HAS_VIEW_BINDING
and self.adapter.table_exists(table_name)
):
logger.info("Skipping creation of the view '%s'", table_name)
return

Expand Down
47 changes: 47 additions & 0 deletions tests/core/engine_adapter/integration/test_integration_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,3 +400,50 @@ def test_table_diff_table_name_matches_column_name(ctx: TestContext):

assert row_diff.stats["join_count"] == 1
assert row_diff.full_match_count == 1


def test_materialized_view_evaluation(ctx: TestContext, engine_adapter: BigQueryEngineAdapter):
model_name = ctx.table("test_tbl")
mview_name = ctx.table("test_mview")

sqlmesh = ctx.create_context()

sqlmesh.upsert_model(
load_sql_based_model(
d.parse(
f"""
MODEL (name {model_name}, kind FULL);

SELECT 1 AS col
"""
)
)
)

sqlmesh.upsert_model(
load_sql_based_model(
d.parse(
f"""
MODEL (name {mview_name}, kind VIEW (materialized true));

SELECT * FROM {model_name}
"""
)
)
)

# Case 1: Ensure that plan is successful and we can query the materialized view
sqlmesh.plan(auto_apply=True, no_prompts=True)

df = engine_adapter.fetchdf(f"SELECT * FROM {mview_name.sql(dialect=ctx.dialect)}")
assert df["col"][0] == 1

# Case 2: Ensure that we can change the underlying table and the materialized view is recreated
sqlmesh.upsert_model(
load_sql_based_model(d.parse(f"""MODEL (name {model_name}, kind FULL); SELECT 2 AS col"""))
)

sqlmesh.plan(auto_apply=True, no_prompts=True)

df = engine_adapter.fetchdf(f"SELECT * FROM {mview_name.sql(dialect=ctx.dialect)}")
assert df["col"][0] == 2
21 changes: 2 additions & 19 deletions tests/core/test_snapshot_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -516,25 +516,8 @@ def test_evaluate_materialized_view(
snapshots={},
)

adapter_mock.table_exists.assert_called_once_with(snapshot.table_name())

if view_exists:
# Evaluation shouldn't take place because the rendered query hasn't changed
# since the last view creation.
assert not adapter_mock.create_view.called
else:
# If the view doesn't exist, it should be created even if the rendered query
# hasn't changed since the last view creation.
adapter_mock.create_view.assert_called_once_with(
snapshot.table_name(),
model.render_query(),
model.columns_to_types,
replace=True,
materialized=True,
view_properties={},
table_description=None,
column_descriptions={},
)
# Ensure that the materialized view is recreated even if it exists
assert adapter_mock.create_view.assert_called


def test_evaluate_materialized_view_with_partitioned_by_cluster_by(
Expand Down