Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions docs/cloud/tcloud_getting_started.md
Original file line number Diff line number Diff line change
Expand Up @@ -268,9 +268,6 @@ Models needing backfill (missing dates):
├── sqlmesh_example.incremental_model: 2020-01-01 - 2024-11-24
└── sqlmesh_example.seed_model: 2024-11-24 - 2024-11-24
Apply - Backfill Tables [y/n]: y
Creating physical tables ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100.0% • 3/3 • 0:00:00

All model versions have been created successfully

[1/1] sqlmesh_example.seed_model evaluated in 0.00s
[1/1] sqlmesh_example.incremental_model evaluated in 0.01s
Expand Down
4 changes: 2 additions & 2 deletions docs/concepts/macros/macro_variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@ SQLMesh provides additional predefined variables used to modify model behavior b

* @runtime_stage - A string value denoting the current stage of the SQLMesh runtime. Typically used in models to conditionally execute pre/post-statements (learn more [here](../models/sql_models.md#optional-prepost-statements)). It returns one of these values:
* 'loading' - The project is being loaded into SQLMesh's runtime context.
* 'creating' - The model tables are being created.
* 'evaluating' - The model query logic is being evaluated.
* 'creating' - The model tables are being created for the first time. The data may be inserted during table creation.
* 'evaluating' - The model query logic is evaluated, and the data is inserted into the existing model table.
* 'promoting' - The model is being promoted in the target environment (view created during virtual layer update).
* 'demoting' - The model is being demoted in the target environment (view dropped during virtual layer update).
* 'auditing' - The audit is being run.
Expand Down
18 changes: 6 additions & 12 deletions docs/examples/incremental_time_full_walkthrough.md
Original file line number Diff line number Diff line change
Expand Up @@ -304,10 +304,6 @@ Models needing backfill (missing dates):
Enter the backfill start date (eg. '1 year', '2020-01-01') or blank to backfill from the beginning of history:
Enter the backfill end date (eg. '1 month ago', '2020-01-01') or blank to backfill up until now:
Apply - Backfill Tables [y/n]: y
Creating physical table ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100.0% • 1/1 • 0:00:07

All model versions have been created successfully

[1/1] demo__dev.incrementals_demo evaluated in 6.97s
Evaluating models ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100.0% • 1/1 • 0:00:06

Expand Down Expand Up @@ -640,9 +636,10 @@ Models:
├── tcloud_raw_data.product_usage
└── tcloud_raw_data.sales
Apply - Virtual Update [y/n]: y
Creating physical tables ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100.0% • 15/15 • 0:00:03

All model versions have been created successfully
SKIP: No physical layer updates to perform

SKIP: No model batches to execute

Virtually Updating 'prod' ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100.0% • 0:00:02

Expand Down Expand Up @@ -732,10 +729,6 @@ Models needing backfill (missing dates):
Enter the preview start date (eg. '1 year', '2020-01-01') or blank to backfill to preview starting from yesterday: 2024-10-27
Enter the preview end date (eg. '1 month ago', '2020-01-01') or blank to preview up until '2024-11-08 00:00:00':
Apply - Preview Tables [y/n]: y
Creating physical table ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100.0% • 1/1 • 0:00:16

All model versions have been created successfully

[1/1] demo__dev.incrementals_demo evaluated in 6.18s
Evaluating models ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100.0% • 1/1 • 0:00:06

Expand Down Expand Up @@ -1249,9 +1242,10 @@ Models:
THEN 'Regular User'
Directly Modified: demo.incrementals_demo (Forward-only)
Apply - Virtual Update [y/n]: y
Creating physical tables ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100.0% • 15/15 • 0:00:02

All model versions have been created successfully
SKIP: No physical layer updates to perform

SKIP: No model batches to execute

Virtually Updating 'prod' ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100.0% • 0:00:02

Expand Down
4 changes: 0 additions & 4 deletions docs/integrations/dlt.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,6 @@ Models needing backfill (missing dates):
├── sushi_dataset_sqlmesh.incremental_sushi_types: 2024-10-03 - 2024-10-03
└── sushi_dataset_sqlmesh.incremental_waiters: 2024-10-03 - 2024-10-03
Apply - Backfill Tables [y/n]: y
Creating physical table ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100.0% • 3/3 • 0:00:00

All model versions have been created successfully

[1/1] sushi_dataset_sqlmesh.incremental__dlt_loads evaluated in 0.01s
[1/1] sushi_dataset_sqlmesh.incremental_sushi_types evaluated in 0.00s
[1/1] sushi_dataset_sqlmesh.incremental_waiters evaluated in 0.01s
Expand Down
3 changes: 2 additions & 1 deletion examples/multi/repo_1/models/a.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
MODEL (
name bronze.a
name bronze.a,
kind FULL
);

SELECT
Expand Down
3 changes: 2 additions & 1 deletion examples/multi/repo_1/models/b.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
MODEL (
name bronze.b
name bronze.b,
kind FULL
);

SELECT
Expand Down
3 changes: 2 additions & 1 deletion examples/multi/repo_2/models/c.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
MODEL (
name silver.c
name silver.c,
kind FULL
);

SELECT DISTINCT col_a
Expand Down
3 changes: 2 additions & 1 deletion examples/multi/repo_2/models/d.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
MODEL (
name silver.d
name silver.d,
kind FULL
);

SELECT
Expand Down
3 changes: 2 additions & 1 deletion examples/multi/repo_2/models/e.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
MODEL (
name silver.e
name silver.e,
kind FULL
);

SELECT
Expand Down
28 changes: 25 additions & 3 deletions examples/sushi/config.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import os

from sqlmesh.core.config.common import VirtualEnvironmentMode
from sqlmesh.core.config.common import VirtualEnvironmentMode, TableNamingConvention
from sqlmesh.core.config import (
AutoCategorizationMode,
BigQueryConnectionConfig,
Expand All @@ -27,6 +27,11 @@
defaults = {"dialect": "duckdb"}
model_defaults = ModelDefaultsConfig(**defaults)
model_defaults_iceberg = ModelDefaultsConfig(**defaults, storage_format="iceberg")
before_all = [
"CREATE SCHEMA IF NOT EXISTS raw",
"DROP VIEW IF EXISTS raw.demographics",
"CREATE VIEW raw.demographics AS (SELECT 1 AS customer_id, '00000' AS zip)",
]


# A DuckDB config, in-memory by default.
Expand All @@ -52,6 +57,7 @@
"nomissingexternalmodels",
],
),
before_all=before_all,
)

bigquery_config = Config(
Expand All @@ -63,6 +69,7 @@
},
default_gateway="bq",
model_defaults=model_defaults,
before_all=before_all,
)

# A configuration used for SQLMesh tests.
Expand All @@ -75,6 +82,7 @@
)
),
model_defaults=model_defaults,
before_all=before_all,
)

# A configuration used for SQLMesh tests with virtual environment mode set to DEV_ONLY.
Expand All @@ -84,14 +92,15 @@
"plan": PlanConfig(
auto_categorize_changes=CategorizerConfig.all_full(),
),
}
},
)

# A DuckDB config with a physical schema map.
map_config = Config(
default_connection=DuckDBConnectionConfig(),
physical_schema_mapping={"^sushi$": "company_internal"},
model_defaults=model_defaults,
before_all=before_all,
)

# A config representing isolated systems with a gateway per system
Expand All @@ -103,6 +112,7 @@
},
default_gateway="dev",
model_defaults=model_defaults,
before_all=before_all,
)

required_approvers_config = Config(
Expand Down Expand Up @@ -137,19 +147,21 @@
),
],
model_defaults=model_defaults,
before_all=before_all,
)


environment_suffix_table_config = Config(
default_connection=DuckDBConnectionConfig(),
model_defaults=model_defaults,
environment_suffix_target=EnvironmentSuffixTarget.TABLE,
before_all=before_all,
)

environment_suffix_catalog_config = environment_suffix_table_config.model_copy(
update={
"environment_suffix_target": EnvironmentSuffixTarget.CATALOG,
}
},
)

CATALOGS = {
Expand All @@ -161,6 +173,7 @@
default_connection=DuckDBConnectionConfig(catalogs=CATALOGS),
default_test_connection=DuckDBConnectionConfig(catalogs=CATALOGS),
model_defaults=model_defaults,
before_all=before_all,
)

environment_catalog_mapping_config = Config(
Expand All @@ -177,4 +190,13 @@
"^prod$": "prod_catalog",
".*": "dev_catalog",
},
before_all=before_all,
)

hash_md5_naming_config = config.copy(
update={"physical_table_naming_convention": TableNamingConvention.HASH_MD5}
)

table_only_naming_config = config.copy(
update={"physical_table_naming_convention": TableNamingConvention.TABLE_ONLY}
)
4 changes: 2 additions & 2 deletions sqlmesh/core/console.py
Original file line number Diff line number Diff line change
Expand Up @@ -4074,8 +4074,8 @@ def _format_node_error(ex: NodeExecutionFailedError) -> str:
node_name = ""
if isinstance(error.node, SnapshotId):
node_name = error.node.name
elif isinstance(error.node, tuple):
node_name = error.node[0]
elif hasattr(error.node, "snapshot_name"):
node_name = error.node.snapshot_name

msg = _format_node_error(error)
msg = " " + msg.replace("\n", "\n ")
Expand Down
6 changes: 4 additions & 2 deletions sqlmesh/core/context_diff.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,9 @@ def create(
infer_python_dependencies=infer_python_dependencies,
)

previous_environment_statements = state_reader.get_environment_statements(environment)
previous_environment_statements = (
state_reader.get_environment_statements(env.name) if env else []
)

if existing_env and always_recreate_environment:
previous_plan_id: t.Optional[str] = existing_env.plan_id
Expand Down Expand Up @@ -288,7 +290,7 @@ def create_no_diff(cls, environment: str, state_reader: StateReader) -> ContextD
previous_finalized_snapshots=env.previous_finalized_snapshots,
previous_requirements=env.requirements,
requirements=env.requirements,
previous_environment_statements=[],
previous_environment_statements=environment_statements,
environment_statements=environment_statements,
previous_gateway_managed_virtual_layer=env.gateway_managed,
gateway_managed_virtual_layer=env.gateway_managed,
Expand Down
9 changes: 8 additions & 1 deletion sqlmesh/core/engine_adapter/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -443,14 +443,20 @@ def replace_query(
target_table=target_table,
source_columns=source_columns,
)
if not target_columns_to_types and table_exists:
target_columns_to_types = self.columns(target_table)
query = source_queries[0].query_factory()
target_columns_to_types = target_columns_to_types or self.columns(target_table)
self_referencing = any(
quote_identifiers(table) == quote_identifiers(target_table)
for table in query.find_all(exp.Table)
)
# If a query references itself then it must have a table created regardless of approach used.
if self_referencing:
if not target_columns_to_types:
raise SQLMeshError(
f"Cannot create a self-referencing table {target_table.sql(dialect=self.dialect)} without knowing the column types. "
"Try casting the columns to an expected type or defining the columns in the model metadata. "
)
self._create_table_from_columns(
target_table,
target_columns_to_types,
Expand All @@ -472,6 +478,7 @@ def replace_query(
**kwargs,
)
if self_referencing:
assert target_columns_to_types is not None
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be an explicit check?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just sharing this thread for some more context.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have an explicit check for this on line 455. This is our 2nd entry into the self_referencing check so at this point we indeed assert that target_columns_to_types is not None.

with self.temp_table(
self._select_columns(target_columns_to_types).from_(target_table),
name=target_table,
Expand Down
4 changes: 3 additions & 1 deletion sqlmesh/core/engine_adapter/base_postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ def columns(
self.execute(sql)
resp = self.cursor.fetchall()
if not resp:
raise SQLMeshError("Could not get columns for table '%s'. Table not found.", table_name)
raise SQLMeshError(
f"Could not get columns for table '{table.sql(dialect=self.dialect)}'. Table not found."
)
return {
column_name: exp.DataType.build(data_type, dialect=self.dialect, udt=True)
for column_name, data_type in resp
Expand Down
8 changes: 4 additions & 4 deletions sqlmesh/core/plan/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
)
from sqlmesh.core.context_diff import ContextDiff
from sqlmesh.core.environment import EnvironmentNamingInfo
from sqlmesh.core.plan.common import should_force_rebuild
from sqlmesh.core.plan.common import should_force_rebuild, is_breaking_kind_change
from sqlmesh.core.plan.definition import (
Plan,
SnapshotMapping,
Expand Down Expand Up @@ -597,7 +597,7 @@ def _categorize_snapshots(
forward_only = self._forward_only or self._is_forward_only_change(s_id)
if forward_only and s_id.name in self._context_diff.modified_snapshots:
new, old = self._context_diff.modified_snapshots[s_id.name]
if should_force_rebuild(old, new) or snapshot.is_seed:
if is_breaking_kind_change(old, new) or snapshot.is_seed:
# Breaking kind changes and seed changes can't be forward-only.
forward_only = False

Expand All @@ -622,7 +622,7 @@ def _categorize_snapshot(
if self._context_diff.directly_modified(s_id.name):
if self._auto_categorization_enabled:
new, old = self._context_diff.modified_snapshots[s_id.name]
if should_force_rebuild(old, new):
if is_breaking_kind_change(old, new):
snapshot.categorize_as(SnapshotChangeCategory.BREAKING, False)
return

Expand Down Expand Up @@ -780,7 +780,7 @@ def _is_forward_only_change(self, s_id: SnapshotId) -> bool:
if snapshot.name in self._context_diff.modified_snapshots:
_, old = self._context_diff.modified_snapshots[snapshot.name]
# If the model kind has changed in a breaking way, then we can't consider this to be a forward-only change.
if snapshot.is_model and should_force_rebuild(old, snapshot):
if snapshot.is_model and is_breaking_kind_change(old, snapshot):
return False
return (
snapshot.is_model and snapshot.model.forward_only and bool(snapshot.previous_versions)
Expand Down
7 changes: 7 additions & 0 deletions sqlmesh/core/plan/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@


def should_force_rebuild(old: Snapshot, new: Snapshot) -> bool:
if new.is_view and new.is_indirect_non_breaking and not new.is_forward_only:
# View models always need to be rebuilt to reflect updated upstream dependencies.
return True
return is_breaking_kind_change(old, new)


def is_breaking_kind_change(old: Snapshot, new: Snapshot) -> bool:
if old.virtual_environment_mode != new.virtual_environment_mode:
# If the virtual environment mode has changed, then we need to rebuild
return True
Expand Down
12 changes: 12 additions & 0 deletions sqlmesh/core/plan/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,16 @@ def visit_physical_layer_update_stage(
success=completion_status is not None and completion_status.is_success
)

def visit_physical_layer_schema_creation_stage(
self, stage: stages.PhysicalLayerSchemaCreationStage, plan: EvaluatablePlan
) -> None:
try:
self.snapshot_evaluator.create_physical_schemas(
stage.snapshots, stage.deployability_index
)
except Exception as ex:
raise PlanError("Plan application failed.") from ex

def visit_backfill_stage(self, stage: stages.BackfillStage, plan: EvaluatablePlan) -> None:
if plan.empty_backfill:
intervals_to_add = []
Expand Down Expand Up @@ -243,6 +253,8 @@ def visit_backfill_stage(self, stage: stages.BackfillStage, plan: EvaluatablePla
circuit_breaker=self._circuit_breaker,
start=plan.start,
end=plan.end,
allow_destructive_snapshots=plan.allow_destructive_models,
selected_snapshot_ids=stage.selected_snapshot_ids,
)
if errors:
raise PlanError("Plan application failed.")
Expand Down
Loading