Skip to content

Commit

Permalink
Merge pull request #807 from DalgoT4D/805-edit-operation-bug
Browse files Browse the repository at this point in the history
made sure the changes propagate and edit doens't lose the connection to source
  • Loading branch information
fatchat authored Aug 10, 2024
2 parents 7624de6 + 781f503 commit 44d7678
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 18 deletions.
19 changes: 10 additions & 9 deletions ddpui/api/transform_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,12 +301,11 @@ def put_operation(request, operation_uuid: str, payload: EditDbtOperationPayload

target_model = dbt_operation.dbtmodel

current_operations_chained = OrgDbtOperation.objects.filter(
dbtmodel=target_model
).count()
all_ops = OrgDbtOperation.objects.filter(dbtmodel=target_model).all()
operation_chained_before = sum(1 for op in all_ops if op.seq < dbt_operation.seq)

final_config, all_input_models = validate_operation_config(
payload, target_model, is_multi_input_op, current_operations_chained, edit=True
payload, target_model, is_multi_input_op, operation_chained_before, edit=True
)

# create edges only with tables/models if not present
Expand All @@ -321,7 +320,6 @@ def put_operation(request, operation_uuid: str, payload: EditDbtOperationPayload
output_cols = dbtautomation_service.get_output_cols_for_operation(
org_warehouse, payload.op_type, final_config["config"].copy()
)
logger.info("updating operation")

dbt_operation.config = final_config
dbt_operation.output_cols = output_cols
Expand All @@ -333,13 +331,16 @@ def put_operation(request, operation_uuid: str, payload: EditDbtOperationPayload
target_model.output_cols = dbt_operation.output_cols
target_model.save()

# update the dbt model if it is already created on disk
if target_model.name is not None:
dbtautomation_service.update_dbt_model_in_project(org_warehouse, target_model)
dbtautomation_service.update_dbt_model_in_project(org_warehouse, target_model)

# propogate the udpates down the chain
dbtautomation_service.propagate_changes_to_downstream_operations(
target_model, dbt_operation, depth=1
)

logger.info("updated output cols for the target model")

return from_orgdbtoperation(dbt_operation, chain_length=current_operations_chained)
return from_orgdbtoperation(dbt_operation, chain_length=len(all_ops))


@transformapi.get(
Expand Down
32 changes: 32 additions & 0 deletions ddpui/core/dbtautomation_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,38 @@ def delete_dbt_model_in_project(orgdbt_model: OrgDbtModel):
return True


def propagate_changes_to_downstream_operations(
target_model: OrgDbtModel, updated_operation: OrgDbtOperation, depth: int = 1
):
"""
- Propagate changes of an update in OrgDbtOperation downstream to all operations that build the target OrgDbtModel
- Propagating changes mean making sure the output of the updated operation i.e. output_cols are available as source_columns to next operations
- By default the depth is 1 i.e. it will only update the next operation
"""

if depth == 0:
logger.info("Terminating propagation as depth is 0")
return

next_op = OrgDbtOperation.objects.filter(
dbtmodel=target_model, seq=updated_operation.seq + 1
).first()

if not next_op:
logger.info("No downstream operations left to propagate changes")
return

config = next_op.config # {"type": .. , "config": {}, "input_models": [...]}
op_config = config.get("config", {})
if "source_columns" in op_config:
op_config["source_columns"] = updated_operation.output_cols

next_op.config = config
next_op.save()

propagate_changes_to_downstream_operations(target_model, next_op, depth - 1)


@app.task(bind=True)
def sync_sources_for_warehouse(
self, org_dbt_id: str, org_warehouse_id: str, orgslug: str
Expand Down
17 changes: 8 additions & 9 deletions ddpui/core/transformfunctions.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def validate_operation_config(
payload: Union[CreateDbtModelPayload, EditDbtOperationPayload],
target_model: OrgDbtModel,
is_multi_input_op: bool,
current_operations_chained: int = 0,
operation_chained_before: int = 0,
edit: bool = False,
):
"""
Expand All @@ -31,19 +31,15 @@ def validate_operation_config(
return [final_config, all_input_models]
"""

if edit:
current_operations_chained -= 1
# if edit:
# current_operations_chained -= 1

primary_input_model: OrgDbtModel = None # the first input model
other_input_models: list[OrgDbtModel] = []
seq: list[int] = []
other_input_columns: list[list[str]] = []

logger.info(
f"Operations chained for the target model {target_model.uuid} : {current_operations_chained}"
)

if current_operations_chained == 0:
if operation_chained_before == 0:
if not payload.input_uuid:
raise HttpError(422, "input is required")

Expand Down Expand Up @@ -71,7 +67,10 @@ def validate_operation_config(
[primary_input_model] if primary_input_model else []
) + other_input_models

logger.info("passed all validation; moving to create operation")
logger.info(
"passed all validation; moving to %s operation",
{"update" if edit else "create"},
)

# source columns or selected columns
# there will be atleast one input
Expand Down

0 comments on commit 44d7678

Please sign in to comment.