Skip to content
41 changes: 41 additions & 0 deletions dlt/common/schema/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@
TSchemaEvolutionMode,
TSchemaSettings,
TSimpleRegex,
TTableReferenceStandalone,
TStoredSchema,
TSchemaTables,
TTableReference,
TTableSchema,
TTableSchemaColumns,
TColumnSchema,
Expand Down Expand Up @@ -606,6 +608,45 @@ def tables(self) -> TSchemaTables:
"""Dictionary of schema tables"""
return self._schema_tables

@property
def references(self) -> list[TTableReferenceStandalone]:
"""References between tables"""
all_references: list[TTableReferenceStandalone] = []
for table_name, table in self.tables.items():
# TODO more specific error handling than ValueError
try:
parent_ref = utils.create_parent_child_reference(self.tables, table_name)
all_references.append(cast(TTableReferenceStandalone, parent_ref))
except ValueError:
pass

try:
root_ref = utils.create_root_child_reference(self.tables, table_name)
all_references.append(cast(TTableReferenceStandalone, root_ref))
except ValueError:
pass

try:
load_table_ref = utils.create_load_table_reference(
self.tables[table_name], naming=self.naming
)
all_references.append(cast(TTableReferenceStandalone, load_table_ref))
except ValueError:
pass

refs = table.get("references")
if not refs:
continue

for ref in refs:
top_level_ref: TTableReference = ref.copy()
if top_level_ref.get("table") is None:
top_level_ref["table"] = table_name

all_references.append(cast(TTableReferenceStandalone, top_level_ref))

return all_references

@property
def settings(self) -> TSchemaSettings:
return self._settings
Expand Down
65 changes: 63 additions & 2 deletions dlt/common/schema/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
NewType,
Union,
)
from typing_extensions import Never
from typing_extensions import Never, NotRequired, Required

from dlt.common.data_types import TDataType
from dlt.common.normalizers.typing import TNormalizersConfig
Expand Down Expand Up @@ -51,6 +51,12 @@
# TODO add schema migration to use `_dlt_load_id` in `_dlt_loads` table
C_DLT_LOADS_TABLE_LOAD_ID = "load_id"
"""load id column in the table {LOADS_TABLE_NAME}. Meant to be joined with {C_DLT_LOAD_ID} of data tables"""
C_CHILD_PARENT_REF_LABEL = "_dlt_parent"
"""Label of the implicit `TTableReference` between a child table and its parent table"""
C_DESCENDANT_ROOT_REF_LABEL = "_dlt_root"
"""Label of the implicit `TTableReference` between a descendant table and its root table"""
C_ROOT_LOAD_REF_LABEL = "_dlt_load"
"""Label of the implicit `TTableReference` between a root table and the _dlt_loads table"""

TColumnProp = Literal[
"name",
Expand Down Expand Up @@ -276,14 +282,69 @@ class TScd2StrategyDict(TMergeDispositionDict, total=False):
]


class TTableReference(TypedDict):
TReferenceCardinality = Literal[
"zero_to_one",
"one_to_zero",
"zero_to_many",
"many_to_zero",
"one_to_many",
"many_to_one",
"one_to_one",
"many_to_many",
]
"""Represents cardinality between `column` (left) and `referenced_column` (right)

Note that cardinality is not symmetric. For example:
- `Author, 0 to many, Book` an author can have 0 to many book
- `Book, 1 to 1, Author` a book must have exactly 1 author

The statement (Author, 0 to many, Book) doesn't imply (Book, many to 0, Author).
"""


class _TTableReferenceBase(TypedDict, total=False):
"""Describes a reference to another table's columns.
`columns` corresponds to the `referenced_columns` in the referenced table and their order should match.
"""

label: Optional[str]
"""Text providing semantic information about the reference.

For example, the label "liked" describe the relationship between `user` and `post` (user.id, "liked", post.id)
"""

cardinality: Optional[TReferenceCardinality]
"""Cardinality of the relationship between `table.column` (left) and `referenced_table.referenced_column` (right)."""

columns: Sequence[str]
"""Name of the column(s) from `table`"""

referenced_table: str
"""Name of the referenced table"""

referenced_columns: Sequence[str]
"""Name of the columns(s) from `referenced_table`"""


class TTableReferenceInline(_TTableReferenceBase, TypedDict, total=False):
table: Optional[str]
"""Name of the table.
When `TTableReference` is defined on a `TTableSchema` (i.e., "inline reference"), the `table`
value is determined by `TTableSchema["name"]`
"""


# Keep for backwards compatibility
TTableReference = TTableReferenceInline


# Compared to `TTableReference` or `TInlineTableReference`, `table` is required
class TTableReferenceStandalone(_TTableReferenceBase, TypedDict, total=False):
table: str
"""Name of the table.
When `TTableReference` is defined on a `TTableSchema` (i.e., "inline reference"), the `table`
value is determined by `TTableSchema["name"]`
"""


TTableReferenceParam = Sequence[TTableReference]
Expand Down
132 changes: 106 additions & 26 deletions dlt/common/schema/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import hashlib
import warnings
import yaml
from argparse import Namespace
from copy import deepcopy, copy
from typing import Dict, List, Sequence, Tuple, Type, Any, cast, Iterable, Optional, Union

Expand All @@ -21,6 +22,9 @@
C_DLT_ID,
C_DLT_LOAD_ID,
C_DLT_LOADS_TABLE_LOAD_ID,
C_CHILD_PARENT_REF_LABEL,
C_DESCENDANT_ROOT_REF_LABEL,
C_ROOT_LOAD_REF_LABEL,
SCHEMA_ENGINE_VERSION,
LOADS_TABLE_NAME,
SIMPLE_REGEX_PREFIX,
Expand Down Expand Up @@ -980,12 +984,35 @@ def create_root_child_reference(tables: TSchemaTables, table_name: str) -> TTabl
root_row_key: str = get_first_column_name_with_prop(root_table, "row_key")

return TTableReference(
label=C_DESCENDANT_ROOT_REF_LABEL,
cardinality="many_to_one",
table=table_name,
columns=[child_root_key],
referenced_table=root_table["name"],
referenced_columns=[root_row_key],
)


def get_all_root_child_references_from_root(
tables: TSchemaTables, table_name: str
) -> list[TTableReference]:
root_table = tables.get(table_name)
if is_nested_table(root_table) is True:
raise ValueError(f"Table `{table_name}` is not a root table.")

children_refs = []
# skip the first table in chain, which is the root; i.e., the current one
for child_table in get_nested_tables(tables, table_name)[1:]:
# try/except because a root table may or may not have child with `root_key` enabled
try:
child_ref = create_root_child_reference(tables, child_table["name"])
children_refs.append(child_ref)
except ValueError:
pass

return children_refs


def create_parent_child_reference(tables: TSchemaTables, table_name: str) -> TTableReference:
"""Create a Reference between `{table}.{parent_key}` and `{parent}.{row_key}`"""
child_table = tables.get(table_name)
Expand All @@ -996,62 +1023,115 @@ def create_parent_child_reference(tables: TSchemaTables, table_name: str) -> TTa

parent_table_name = child_table.get("parent")
if parent_table_name is None:
raise ValueError(f"No parent table found for `{table_name=:}`")
raise ValueError(f"Table `{table_name}` is a root table and has no parent.")
parent_table = tables.get(parent_table_name)

child_parent_key: str = get_first_column_name_with_prop(child_table, "parent_key")
parent_row_key: str = get_first_column_name_with_prop(parent_table, "row_key")

return TTableReference(
label=C_CHILD_PARENT_REF_LABEL,
cardinality="many_to_one",
table=table_name,
columns=[child_parent_key],
referenced_table=parent_table_name,
referenced_columns=[parent_row_key],
)


def create_load_table_reference(table: TTableSchema) -> TTableReference:
"""Create a Reference between `{table}._dlt_oad_id` and `_dlt_loads.load_id`"""
if table["columns"].get(C_DLT_LOAD_ID) is None:
raise ValueError(f"Column `{C_DLT_LOAD_ID}` not found for `table_name={table['name']}`")
def get_all_parent_child_references_from_root(
tables: TSchemaTables, table_name: str
) -> list[TTableReference]:
root_table = tables.get(table_name)
if is_nested_table(root_table) is True:
raise ValueError(f"Table `{table_name}` is not a root table.")

children_refs = []
# skip the first table in chain, which is the root; i.e., the current one
for child_table in get_nested_tables(tables, table_name)[1:]:
# try/except because a root table may or may not have child with `root_key` enabled
try:
child_ref = create_parent_child_reference(tables, child_table["name"])
children_refs.append(child_ref)
except ValueError:
pass

return children_refs


def create_load_table_reference(
table: TTableSchema, *, naming: NamingConvention = None
) -> TTableReference:
"""Create a Reference between `{table}._dlt_load_id` and `_dlt_loads.load_id`"""
# TODO temporary solution; refactor caller to always explicitly pass `naming`
naming = naming if naming else Namespace(normalize_identifier=lambda x: x) # type: ignore[assignment]

load_id_column = naming.normalize_identifier(C_DLT_LOAD_ID)
if table["columns"].get(load_id_column) is None:
raise ValueError(
f"Table `{table['name']}` is not a root table and has no `{load_id_column}` column."
)

return TTableReference(
columns=[C_DLT_LOAD_ID],
referenced_table=LOADS_TABLE_NAME,
referenced_columns=[C_DLT_LOADS_TABLE_LOAD_ID],
label=C_ROOT_LOAD_REF_LABEL,
cardinality="many_to_one",
table=table["name"],
columns=[load_id_column],
referenced_table=naming.normalize_identifier(LOADS_TABLE_NAME),
referenced_columns=[naming.normalize_identifier(C_DLT_LOADS_TABLE_LOAD_ID)],
)


def create_version_and_loads_hash_reference(tables: TSchemaTables) -> TTableReference:
if VERSION_TABLE_NAME not in tables:
def create_version_and_loads_hash_reference(
tables: TSchemaTables, *, naming: NamingConvention = None
) -> TTableReference:
# TODO temporary solution; refactor caller to always explicitly pass `naming`
naming = naming if naming else Namespace(normalize_identifier=lambda x: x) # type: ignore[assignment]

version_table_name = naming.normalize_identifier(VERSION_TABLE_NAME)
if version_table_name not in tables:
raise ValueError(
f"Table `{VERSION_TABLE_NAME}` not found in tables: `{list(tables.keys())}`"
f"Table `{version_table_name}` not found in tables: `{list(tables.keys())}`"
)

if LOADS_TABLE_NAME not in tables:
raise ValueError(f"Table `{LOADS_TABLE_NAME}` not found in tables: `{list(tables.keys())}`")
load_table_name = naming.normalize_identifier(LOADS_TABLE_NAME)
if load_table_name not in tables:
raise ValueError(f"Table `{load_table_name}` not found in tables: `{list(tables.keys())}`")

return TTableReference(
columns=["version_hash"],
referenced_table=LOADS_TABLE_NAME,
referenced_columns=["schema_version_hash"],
label="_dlt_schema_version",
cardinality="one_to_many",
table=version_table_name,
columns=[naming.normalize_identifier("version_hash")],
referenced_table=load_table_name,
referenced_columns=[naming.normalize_identifier("schema_version_hash")],
)


def create_version_and_loads_schema_name_reference(tables: TSchemaTables) -> TTableReference:
if VERSION_TABLE_NAME not in tables:
def create_version_and_loads_schema_name_reference(
tables: TSchemaTables, *, naming: NamingConvention = None
) -> TTableReference:
# TODO temporary solution; refactor caller to always explicitly pass `naming`
naming = naming if naming else Namespace(normalize_identifier=lambda x: x) # type: ignore[assignment]

version_table_name = naming.normalize_identifier(VERSION_TABLE_NAME)
if version_table_name not in tables:
raise ValueError(
f"Table `{VERSION_TABLE_NAME}` not found in tables: `{list(tables.keys())}`"
f"Table `{version_table_name}` not found in tables: `{list(tables.keys())}`"
)

if LOADS_TABLE_NAME not in tables:
raise ValueError(f"Table `{LOADS_TABLE_NAME}` not found in tables: `{list(tables.keys())}`")
load_table_name = naming.normalize_identifier(LOADS_TABLE_NAME)
if load_table_name not in tables:
raise ValueError(f"Table `{load_table_name}` not found in tables: `{list(tables.keys())}`")

loads_and_version_hash_schema_name_ref = TTableReference(
columns=["schema_name"],
referenced_table=LOADS_TABLE_NAME,
referenced_columns=["schema_name"],
return TTableReference(
label="_dlt_schema_name",
cardinality="many_to_many",
table=version_table_name,
columns=[naming.normalize_identifier("schema_name")],
referenced_table=load_table_name,
referenced_columns=[naming.normalize_identifier("schema_name")],
)
return loads_and_version_hash_schema_name_ref


def migrate_complex_types(table: TTableSchema, warn: bool = False) -> None:
Expand Down
3 changes: 2 additions & 1 deletion dlt/helpers/dbml.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,8 @@ def _to_dbml_reference(
def _from_dbml_reference(reference: Reference) -> TTableReference:
"""Convert a DBML reference to a dlt table reference"""
return TTableReference(
referenced_table=reference.col2[0].table.name,
table=reference.table1.name,
referenced_table=reference.table2.name,
columns=[col.name for col in reference.col1],
referenced_columns=[col.name for col in reference.col2],
)
Expand Down
Loading
Loading