-
Notifications
You must be signed in to change notification settings - Fork 356
feat: Smart dlt.Relation.join()
#2960
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: devel
Are you sure you want to change the base?
Conversation
✅ Deploy Preview for dlt-hub-docs canceled.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is pretty cool. Top level comments:
- I think we should have one
joinmethod on relation - I assumed that we support joins of any depth in my review. that impacts aliasing
- I assumed
sqlglotis not too smart with generating unique aliases for columns and tables when joining - I linked some code that is generating fact tables. the goal is more or less the same as here
| else: | ||
| raise TypeError | ||
|
|
||
| current_table_name = rel._sqlglot_expression.find(sge.From).name |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will it work with multiple join? ie. I have nesting level 2 and I use join twice. hmmm I think so. or multi joins are out of scope?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This work was based on the incremental loading PRs were you suggested to not do recursive joins and only support immediate parent-child and root-child joins.
For this PR I believe we could support
"""
tables with `root_key=True`:
- orders
- orders__items
- orders__items__details
- customers # with ref `orders.customer_id == customers.id`
"""
# parent-child
dataset.table("orders").join("orders__items")
# LEFT JOIN orders._dlt_id = orders__items._dlt_parent_id
dataset.table("orders__items").join("orders__items__details")
# LEFT JOIN orders__items._dlt_id = orders__items__details._dlt_parent_id
# root-descendant
dataset.table("orders").join("orders__items__details")
# LEFT JOIN orders._dlt_id = orders__items__details._dlt_root_id
# relationship
dataset.table("orders").join("customers")
# INNER JOIN orders.customer_id = customers.id
dataset.table("customers").join("orders")
# INNER JOIN customer.id = orders.customer_idCurrently excluded:
# recursive parent-child
# recursive API
dataset.table("orders").join("orders__items").join("orders__items__details")
# using a list API is easier
dataset.table("orders").join(["orders__items", "orders__items__details"])This has new implications
- We're keeping columns from 3 tables instead of 2
- It's different from
root-descendantbecause we are including intermediary parents which have no descendant rows. - Providing convenience table/column aliases is much harder for recursive API because we need to introspect the previous query that renamed columns
| raise TypeError | ||
|
|
||
| current_table_name = rel._sqlglot_expression.find(sge.From).name | ||
| current_table_schema = self._dataset.schema.tables[current_table_name] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you should take column names from the (qualified) relation's query. so you can identify whose columns are coming from current_table_name, you should also recover aliases here to create column alias in final SELECT
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this will let you create joins on relations that do not select all columns
| if references is None: | ||
| raise KeyError(f"Not references found on schema for table `{current_table_name}`") | ||
|
|
||
| other_table_name = other_rel._sqlglot_expression.find(sge.From).name |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same thing
| current_cols_aliases = [] | ||
| other_cols_aliases = [] | ||
|
|
||
| for col in current_table_schema["columns"]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here you are comparing normalized identifiers with non normalized constants (C_DLT_LOAD_ID). here's how we identify dlt columns and tables in filesystem:
def is_dlt_identifier(self, ident: str) -> bool:
return ident.startswith(self.schema._dlt_tables_prefix)maybe it makes sense to add it to Schema object
| return select_expr | ||
|
|
||
|
|
||
| def _create_column_alias( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes this is good! but please use naming convention to create "compound" identifiers ie. this will split parts into single identifiers and then create compound one (takes into account that parts may be already compound)
def normalize_alias(c: DbtGeneratorConfig, *parts: str) -> str:
return c.naming.normalize_path(c.naming.make_path(*parts))
| alias = _create_column_alias(parent_table_name, col) | ||
| parent_cols_aliases.append(alias) | ||
|
|
||
| _, _, child_table_part = child_table_name.removeprefix(parent_table_name).partition("__") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you should use break_path and make_path in schema.naming.
|
|
||
| parent_cols_aliases = [] | ||
| for col in parent_table_schema["columns"]: | ||
| if col in ["_dlt_id", "_dlt_load_id"]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please compare only normalize identifiers. it may make sense to cache _dlt_id and _dlt_load_id in schema along a few others that are already there:
self._dlt_tables_prefix = to_naming.normalize_table_identifier(DLT_NAME_PREFIX)
self.version_table_name = to_naming.normalize_table_identifier(VERSION_TABLE_NAME)
self.loads_table_name = to_naming.normalize_table_identifier(LOADS_TABLE_NAME)
self.state_table_name = to_naming.normalize_table_identifier(PIPELINE_STATE_TABLE_NAME)| *other_cols_aliases, | ||
| ) | ||
| .from_(current_table_name) | ||
| .join(other_table_name, on=join_condition, join_type=how) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what happens if other relation already has a join? ie. users.join(orders.join(items)? is sqlglot able to generate proper aliasing itself? if not you'll need to assign aliases to the tables ie orders__items. same table may be joined several times. also you remove prefixes from nested tables. that may create conflicts.
| child_cols_aliases.append(alias) | ||
|
|
||
| meta_cols_aliases = ( | ||
| _create_column_alias(parent_table_name, "_dlt_load_id", prefix="", separator=""), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is done so query looks nicer?
| rel._sqlglot_expression = rel._sqlglot_expression.where(condition) | ||
| return rel | ||
|
|
||
| def join( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We already have join generator which works pretty well for generating dbt packages. It supports both regular and nested references and also joins of unlimited depth (via table and column aliasing). Maybe some code or ideas could be helpful here?
here we render fact table: https://github.com/dlt-hub/dlt-plus/blob/devel/packages/dlt_plus/dlt_plus/dbt_generator/render.py#L72
this procedure renders fact table (indicated by config): it will create all possible join (regular and nested), also recursively up to a certain depth. your case is simpler (just one join) but the resultant relation should allow to add more joins (both on the same level and joins of join). or this is out of scope?
- first we collect all references, validate them, and convert nested (automated) references into regular references (
get_rendered_tables) - then required tables are added, here the crucial part is how aliasing is done. all tables except top (fact) table will be aliased uniquely. see https://github.com/dlt-hub/dlt-plus/blob/devel/packages/dlt_plus/dlt_plus/dbt_generator/render.py#L85 and https://github.com/dlt-hub/dlt-plus/blob/devel/packages/dlt_plus/dlt_plus/dbt_generator/render.py#L113
- all columns that are added to the joined relation are uniquely aliased as well: https://github.com/dlt-hub/dlt-plus/blob/devel/packages/dlt_plus/dlt_plus/dbt_generator/render.py#L143 table aliases, not names are used to make sure that columns in SELECT list are unique
This adds convenience methods
dlt.Relation(...).join()anddlt.Relation(...).join_child(...)to simplify joins between tables that haveTTableReferenceondlt.Schema.Design decisions
LEFTjoins.INNER, but could be any type.join()can receive:dlt.Relationto join on (if column is unambiguous)"table_name"(if column is unambiguous)"table_name.column_name"in case of multiple references between tablesOpen questions
.join()and.join_parent_child()separately?