Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions dlt/common/schema/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from dlt.common.schema.typing import (
C_DLT_ID,
C_DLT_LOAD_ID,
DLT_NAME_PREFIX,
C_DLT_LOADS_TABLE_LOAD_ID,
SCHEMA_ENGINE_VERSION,
LOADS_TABLE_NAME,
Expand Down Expand Up @@ -890,6 +891,52 @@ def get_data_and_dlt_tables(tables: TSchemaTables) -> tuple[list[TTableSchema],
return data_tables, dlt_tables


def changes_without_dlt_changes(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this name was correct in context of callback function. here it just removed "dlt entities" from a list of tables. so (1) it needs a different name (2) it makes sense to split it into many functions. see my general review

updates: Dict[str, TTableSchema],
exclude_dlt_tables: bool = True,
exclude_dlt_columns: bool = True,
dlt_prefix: str = DLT_NAME_PREFIX,
) -> Dict[str, TTableSchema]:
"""
Convenience method to return a shallow copy of the updates dict with all dlt-tables and/or
dlt-columns removed.

Args:
updates: The updates made to all tables in the schema, e.g. from trace load packages
exclude_dlt_tables: If True, remove tables whose name starts with given dlt_tables_prefix
exclude_dlt_columns: If True, remove columns whose name starts with given dlt_column_prefix
dlt_prefix: by which to detect if a table or column is dlt internal
Returns:
Filtered dict with the same structure as input.

Note: dlt supports changing the default prefix, see schema._dlt_tables_prefix attribute to get
the source of truth for your schema
"""

filtered_tables: Dict[str, TTableSchema] = {}
for table_name, table_schema in updates.items():
if exclude_dlt_tables and table_name.startswith(dlt_prefix):
continue

# Create a copy of the table schema, preserving all fields except columns
new_table_schema = cast(
TTableSchema, {k: v for k, v in table_schema.items() if k != "columns"}
)

if "columns" in table_schema:
if exclude_dlt_columns:
new_table_schema["columns"] = {
col: col_def
for col, col_def in table_schema["columns"].items()
if not col.startswith(dlt_prefix)
}
else:
new_table_schema["columns"] = table_schema["columns"]

filtered_tables[table_name] = new_table_schema
return filtered_tables


def get_root_table(tables: TSchemaTables, table_name: str) -> TTableSchema:
"""Finds root (without parent) of a `table_name` following the nested references (row_key - parent_key)."""
table = tables[table_name]
Expand Down
Loading