Skip to content

Commit af94e58

Browse files
authored
(feat) allows to add SQL statements to schema migration executed after tables were created/altered (#2791)
1 parent 3f2ed85 commit af94e58

File tree

2 files changed

+31
-11
lines changed

2 files changed

+31
-11
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ help:
3737
install-uv:
3838
ifneq ($(VIRTUAL_ENV),)
3939
$(error you cannot be under virtual environment $(VIRTUAL_ENV))
40-
endif u
40+
endif
4141
curl -LsSf https://astral.sh/uv/install.sh | sh
4242

4343
has-uv:

dlt/destinations/job_client_impl.py

Lines changed: 30 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
COLUMN_HINTS,
3434
TColumnType,
3535
TColumnSchemaBase,
36+
TPartialTableSchema,
3637
)
3738
from dlt.common.schema.utils import (
3839
get_inherited_table_hint,
@@ -624,7 +625,10 @@ def _get_storage_table_query_columns(self) -> List[str]:
624625
return fields
625626

626627
def _execute_schema_update_sql(self, only_tables: Iterable[str]) -> TSchemaTables:
627-
sql_scripts, schema_update = self._build_schema_update_sql(only_tables)
628+
# Only `only_tables` are included, or all if None.
629+
sql_scripts, schema_update = self._build_schema_update_sql(
630+
list(self.get_storage_tables(only_tables or self.schema.tables.keys()))
631+
)
628632
# Stay within max query size when doing DDL.
629633
# Some DB backends use bytes not characters, so decrease the limit by half,
630634
# assuming most of the characters in DDL encoded into single bytes.
@@ -633,25 +637,23 @@ def _execute_schema_update_sql(self, only_tables: Iterable[str]) -> TSchemaTable
633637
return schema_update
634638

635639
def _build_schema_update_sql(
636-
self, only_tables: Iterable[str]
640+
self, storage_tables: Iterable[Tuple[str, TTableSchemaColumns]]
637641
) -> Tuple[List[str], TSchemaTables]:
638642
"""Generates CREATE/ALTER sql for tables that differ between the destination and in the client's Schema.
639643
640-
This method compares all or `only_tables` defined in `self.schema` to the respective tables in the destination.
641-
It detects only new tables and new columns.
642-
Any other changes like data types, hints, etc. are ignored.
644+
This method compares schema tables to the respective tables in the destination passed in `storage_tables`
645+
It detects only new tables and new columns. Any other changes like data types, hints, etc. are ignored.
643646
644647
Args:
645-
only_tables (Iterable[str]): Only `only_tables` are included, or all if None.
648+
storage_tables (Iterable[Tuple[str, TTableSchemaColumns]]): list of storage tables (tuples (name, column schema))
646649
647650
Returns:
648651
Tuple[List[str], TSchemaTables]: Tuple with a list of CREATE/ALTER scripts, and a list of all tables with columns that will be added.
649652
"""
650653
sql_updates = []
654+
post_sql_updates = []
651655
schema_update: TSchemaTables = {}
652-
for table_name, storage_columns in self.get_storage_tables(
653-
only_tables or self.schema.tables.keys()
654-
):
656+
for table_name, storage_columns in storage_tables:
655657
# this will skip incomplete columns
656658
new_columns = self._create_table_update(table_name, storage_columns)
657659
generate_alter = len(storage_columns) > 0
@@ -668,6 +670,14 @@ def _build_schema_update_sql(
668670
# keep only new columns
669671
partial_table["columns"] = {c["name"]: c for c in new_columns}
670672
schema_update[table_name] = partial_table
673+
post_sql_statements = self._get_table_post_update_sql(partial_table)
674+
for sql in post_sql_statements:
675+
if not sql.endswith(";"):
676+
sql += ";"
677+
post_sql_updates.append(sql)
678+
679+
# add post sql updates at the end
680+
sql_updates.extend(post_sql_updates)
671681

672682
return sql_updates, schema_update
673683

@@ -678,6 +688,7 @@ def _make_add_column_sql(
678688
return [f"ADD COLUMN {self._get_column_def_sql(c, table)}" for c in new_columns]
679689

680690
def _make_create_table(self, qualified_name: str, table: PreparedTableSchema) -> str:
691+
"""Begins CREATE TABLE statement"""
681692
not_exists_clause = " "
682693
if (
683694
table["name"] in self.schema.dlt_table_names()
@@ -689,7 +700,9 @@ def _make_create_table(self, qualified_name: str, table: PreparedTableSchema) ->
689700
def _get_table_update_sql(
690701
self, table_name: str, new_columns: Sequence[TColumnSchema], generate_alter: bool
691702
) -> List[str]:
692-
# build sql
703+
"""Generates a list of SQL statements that updates table `table_name` to include `new_columns`
704+
columns. `generate_alter` is set to True if table already exists in destination.
705+
"""
693706
qualified_name = self.sql_client.make_qualified_table_name(table_name)
694707
table = self.prepare_load_table(table_name)
695708
sql_result: List[str] = []
@@ -719,8 +732,15 @@ def _get_table_update_sql(
719732
def _get_constraints_sql(
720733
self, table_name: str, new_columns: Sequence[TColumnSchema], generate_alter: bool
721734
) -> str:
735+
"""Creates or alters additional constraints that are not inlined into CREATE/ALTER TABLE statements"""
722736
return ""
723737

738+
def _get_table_post_update_sql(self, partial_table: TPartialTableSchema) -> List[str]:
739+
"""Generates SQL statements executed after all tables are migrated i.e. containing foreign reference.
740+
`partial_table` contains all table hints and new columns with their hints.
741+
"""
742+
return []
743+
724744
def _check_table_update_hints(
725745
self, table_name: str, new_columns: Sequence[TColumnSchema], generate_alter: bool
726746
) -> None:

0 commit comments

Comments
 (0)