From ab538b78da51a3cc92fd4734e0116e9cf87d782b Mon Sep 17 00:00:00 2001 From: Ishankoradia Date: Tue, 26 Mar 2024 13:32:05 +0530 Subject: [PATCH 1/4] refactor create operation api --- ddpui/api/transform_api.py | 80 ++------------------- ddpui/core/transformfunctions.py | 104 +++++++++++++++++++++++++++ ddpui/schemas/dbt_workflow_schema.py | 12 ++++ 3 files changed, 122 insertions(+), 74 deletions(-) create mode 100644 ddpui/core/transformfunctions.py diff --git a/ddpui/api/transform_api.py b/ddpui/api/transform_api.py index cec1fc11..2bd1a5f7 100644 --- a/ddpui/api/transform_api.py +++ b/ddpui/api/transform_api.py @@ -22,7 +22,9 @@ from ddpui.schemas.dbt_workflow_schema import ( CreateDbtModelPayload, CompleteDbtModelPayload, + EditDbtOperationPayload, ) +from ddpui.core.transformfunctions import validate_operation_config from ddpui.core import dbtautomation_service from ddpui.core.dbtautomation_service import sync_sources_for_warehouse @@ -196,43 +198,10 @@ def post_construct_dbt_model_operation(request, payload: CreateDbtModelPayload): dbtmodel=target_model ).count() - primary_input_model: OrgDbtModel = None # the first input model - other_input_models: list[OrgDbtModel] = [] - seq: list[int] = [] - other_input_columns: list[list[str]] = [] - - logger.info( - f"Operations chained for the target model {target_model.uuid} : {current_operations_chained}" + final_config, all_input_models = validate_operation_config( + payload, target_model, is_multi_input_op, current_operations_chained ) - if current_operations_chained == 0: - if not payload.input_uuid: - raise HttpError(422, "input is required") - - model = OrgDbtModel.objects.filter(uuid=payload.input_uuid).first() - if not model: - raise HttpError(404, "input not found") - - primary_input_model = model - - if is_multi_input_op: # multi input operation - if len(payload.other_inputs) == 0: - raise HttpError(422, "atleast 2 inputs are required for this operation") - - payload.other_inputs.sort(key=lambda x: x.seq) - - for other_input in payload.other_inputs: - model = OrgDbtModel.objects.filter(uuid=other_input.uuid).first() - if not model: - raise HttpError(404, "input not found") - seq.append(other_input.seq) - other_input_columns.append(other_input.columns) - other_input_models.append(model) - - all_input_models = ( - [primary_input_model] if primary_input_model else [] - ) + other_input_models - # we create edges only with tables/models at the start of the chain & not operation nodes if current_operations_chained == 0: for source in all_input_models: @@ -245,45 +214,8 @@ def post_construct_dbt_model_operation(request, payload: CreateDbtModelPayload): to_node=target_model, ) - logger.info("passed all validation; moving to create operation") - - # source columns or selected columns - # there will be atleast one input - OP_CONFIG = payload.config - OP_CONFIG["source_columns"] = payload.source_columns - OP_CONFIG["other_inputs"] = [] - - # in case of mutli input; send the rest of the inputs in the config; dbt_automation will handle the rest - for dbtmodel, seq, columns in zip(other_input_models, seq, other_input_columns): - OP_CONFIG["other_inputs"].append( - { - "input": { - "input_type": dbtmodel.type, - "input_name": dbtmodel.name, - "source_name": dbtmodel.source_name, - }, - "source_columns": columns, - "seq": seq, - } - ) - - input_config = { - "config": OP_CONFIG, - "type": payload.op_type, - "input_models": [ - { - "uuid": str(model.uuid), - "name": model.name, - "display_name": model.display_name, - "source_name": model.source_name, - "schema": model.schema, - "type": model.type, - } - for model in all_input_models - ], - } output_cols = dbtautomation_service.get_output_cols_for_operation( - org_warehouse, payload.op_type, OP_CONFIG.copy() + org_warehouse, payload.op_type, final_config["config"].copy() ) logger.info("creating operation") @@ -291,7 +223,7 @@ def post_construct_dbt_model_operation(request, payload: CreateDbtModelPayload): dbtmodel=target_model, uuid=uuid.uuid4(), seq=current_operations_chained + 1, - config=input_config, + config=final_config, output_cols=output_cols, ) diff --git a/ddpui/core/transformfunctions.py b/ddpui/core/transformfunctions.py new file mode 100644 index 00000000..cf231bb9 --- /dev/null +++ b/ddpui/core/transformfunctions.py @@ -0,0 +1,104 @@ +from ninja.errors import HttpError +from ddpui.models.dbt_workflow import OrgDbtModel, DbtEdge, OrgDbtOperation +from ddpui.utils.custom_logger import CustomLogger +from ddpui.schemas.dbt_workflow_schema import ( + CreateDbtModelPayload, +) + +logger = CustomLogger("ddpui") + + +def validate_operation_config( + payload: CreateDbtModelPayload, + target_model: OrgDbtModel, + is_multi_input_op: bool, + current_operations_chained: int = 0, + edit: bool = False, +): + """ + - Validate if the operation config has correct input(s) + - Raises http errors if validation fails + - Return ordered list of input models and the final config of operation + + return [final_config, all_input_models] + """ + + if edit: + current_operations_chained -= 1 + + primary_input_model: OrgDbtModel = None # the first input model + other_input_models: list[OrgDbtModel] = [] + seq: list[int] = [] + other_input_columns: list[list[str]] = [] + + logger.info( + f"Operations chained for the target model {target_model.uuid} : {current_operations_chained}" + ) + + if current_operations_chained == 0: + if not payload.input_uuid: + raise HttpError(422, "input is required") + + model = OrgDbtModel.objects.filter(uuid=payload.input_uuid).first() + if not model: + raise HttpError(404, "input not found") + + primary_input_model = model + + if is_multi_input_op: # multi input operation + if len(payload.other_inputs) == 0: + raise HttpError(422, "atleast 2 inputs are required for this operation") + + payload.other_inputs.sort(key=lambda x: x.seq) + + for other_input in payload.other_inputs: + model = OrgDbtModel.objects.filter(uuid=other_input.uuid).first() + if not model: + raise HttpError(404, "input not found") + seq.append(other_input.seq) + other_input_columns.append(other_input.columns) + other_input_models.append(model) + + all_input_models = ( + [primary_input_model] if primary_input_model else [] + ) + other_input_models + + logger.info("passed all validation; moving to create operation") + + # source columns or selected columns + # there will be atleast one input + OP_CONFIG = payload.config + OP_CONFIG["source_columns"] = payload.source_columns + OP_CONFIG["other_inputs"] = [] + + # in case of mutli input; send the rest of the inputs in the config; dbt_automation will handle the rest + for dbtmodel, seq, columns in zip(other_input_models, seq, other_input_columns): + OP_CONFIG["other_inputs"].append( + { + "input": { + "input_type": dbtmodel.type, + "input_name": dbtmodel.name, + "source_name": dbtmodel.source_name, + }, + "source_columns": columns, + "seq": seq, + } + ) + + input_config = { + "config": OP_CONFIG, + "type": payload.op_type, + "input_models": [ + { + "uuid": str(model.uuid), + "name": model.name, + "display_name": model.display_name, + "source_name": model.source_name, + "schema": model.schema, + "type": model.type, + } + for model in all_input_models + ], + } + + return (input_config, all_input_models) diff --git a/ddpui/schemas/dbt_workflow_schema.py b/ddpui/schemas/dbt_workflow_schema.py index 67e7203e..db8f38ac 100644 --- a/ddpui/schemas/dbt_workflow_schema.py +++ b/ddpui/schemas/dbt_workflow_schema.py @@ -24,6 +24,18 @@ class CreateDbtModelPayload(Schema): other_inputs: list[InputModelPayload] = [] +class EditDbtOperationPayload(Schema): + """ + schema to define the payload required to edit a dbt operation + """ + + config: dict + op_type: str + input_uuid: str = "" + source_columns: list[str] = [] + other_inputs: list[InputModelPayload] = [] + + class CompleteDbtModelPayload(Schema): """ schema to define the payload required to create a custom org task From d26ed8c270a54636009ffd9e7708cbc6695043bc Mon Sep 17 00:00:00 2001 From: Ishankoradia Date: Tue, 26 Mar 2024 13:44:34 +0530 Subject: [PATCH 2/4] edit operation api --- ddpui/api/transform_api.py | 106 +++++++++++++++++++++++++++++++++---- 1 file changed, 95 insertions(+), 11 deletions(-) diff --git a/ddpui/api/transform_api.py b/ddpui/api/transform_api.py index 2bd1a5f7..2c5d9a45 100644 --- a/ddpui/api/transform_api.py +++ b/ddpui/api/transform_api.py @@ -202,17 +202,14 @@ def post_construct_dbt_model_operation(request, payload: CreateDbtModelPayload): payload, target_model, is_multi_input_op, current_operations_chained ) - # we create edges only with tables/models at the start of the chain & not operation nodes - if current_operations_chained == 0: - for source in all_input_models: - edge = DbtEdge.objects.filter( - from_node=source, to_node=target_model - ).first() - if not edge: - DbtEdge.objects.create( - from_node=source, - to_node=target_model, - ) + # we create edges only with tables/models + for source in all_input_models: + edge = DbtEdge.objects.filter(from_node=source, to_node=target_model).first() + if not edge: + DbtEdge.objects.create( + from_node=source, + to_node=target_model, + ) output_cols = dbtautomation_service.get_output_cols_for_operation( org_warehouse, payload.op_type, final_config["config"].copy() @@ -244,6 +241,93 @@ def post_construct_dbt_model_operation(request, payload: CreateDbtModelPayload): } +@transformapi.put( + "/dbt_project/model/operations/{operation_uuid}/", auth=auth.CanManagePipelines() +) +def put_operation(request, operation_uuid: str, payload: EditDbtOperationPayload): + """ + Update operation config + """ + orguser: OrgUser = request.orguser + org = orguser.org + + is_multi_input_op = payload.op_type in ["join", "unionall"] + + org_warehouse = OrgWarehouse.objects.filter(org=org).first() + if not org_warehouse: + raise HttpError(404, "please setup your warehouse first") + + # make sure the orgdbt here is the one we create locally + orgdbt = OrgDbt.objects.filter(org=org, gitrepo_url=None).first() + if not orgdbt: + raise HttpError(404, "dbt workspace not setup") + + try: + uuid.UUID(str(operation_uuid)) + except ValueError: + raise HttpError(400, "operation not found") + + dbt_operation = OrgDbtOperation.objects.filter(uuid=operation_uuid).first() + if not dbt_operation: + raise HttpError(404, "operation not found") + + if dbt_operation.dbtmodel.under_construction is False: + raise HttpError(403, "model is locked") + + # allow edit of only leaf operation nodes + if ( + OrgDbtOperation.objects.filter( + dbtmodel=dbt_operation.dbtmodel, seq__gt=dbt_operation.seq + ).count() + >= 1 + ): + raise HttpError(403, "operation is locked; cannot edit") + + target_model = dbt_operation.dbtmodel + + current_operations_chained = OrgDbtOperation.objects.filter( + dbtmodel=target_model + ).count() + + final_config, all_input_models = validate_operation_config( + payload, target_model, is_multi_input_op, current_operations_chained + ) + + # create edges only with tables/models if not present + for source in all_input_models: + edge = DbtEdge.objects.filter(from_node=source, to_node=target_model).first() + if not edge: + DbtEdge.objects.create( + from_node=source, + to_node=target_model, + ) + + output_cols = dbtautomation_service.get_output_cols_for_operation( + org_warehouse, payload.op_type, final_config["config"].copy() + ) + logger.info("updating operation") + + dbt_operation.config = final_config + dbt_operation.output_cols = output_cols + dbt_operation.save() + + logger.info("updated operation") + + # save the output cols of the latest operation to the dbt model + target_model.output_cols = dbt_operation.output_cols + target_model.save() + + logger.info("updated output cols for the target model") + + return { + "id": dbt_operation.uuid, + "output_cols": dbt_operation.output_cols, + "config": dbt_operation.config, + "type": "operation_node", + "target_model_id": dbt_operation.dbtmodel.uuid, + } + + @transformapi.post( "/dbt_project/model/{model_uuid}/save/", auth=auth.CanManagePipelines() ) From f619a3e190589083ef9176261d09234bfe33cae9 Mon Sep 17 00:00:00 2001 From: Ishankoradia Date: Tue, 26 Mar 2024 13:46:14 +0530 Subject: [PATCH 3/4] minor update --- ddpui/api/transform_api.py | 2 +- ddpui/core/transformfunctions.py | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/ddpui/api/transform_api.py b/ddpui/api/transform_api.py index 2c5d9a45..679cee59 100644 --- a/ddpui/api/transform_api.py +++ b/ddpui/api/transform_api.py @@ -290,7 +290,7 @@ def put_operation(request, operation_uuid: str, payload: EditDbtOperationPayload ).count() final_config, all_input_models = validate_operation_config( - payload, target_model, is_multi_input_op, current_operations_chained + payload, target_model, is_multi_input_op, current_operations_chained, edit=True ) # create edges only with tables/models if not present diff --git a/ddpui/core/transformfunctions.py b/ddpui/core/transformfunctions.py index cf231bb9..bc514751 100644 --- a/ddpui/core/transformfunctions.py +++ b/ddpui/core/transformfunctions.py @@ -1,15 +1,17 @@ +from typing import Union from ninja.errors import HttpError from ddpui.models.dbt_workflow import OrgDbtModel, DbtEdge, OrgDbtOperation from ddpui.utils.custom_logger import CustomLogger from ddpui.schemas.dbt_workflow_schema import ( CreateDbtModelPayload, + EditDbtOperationPayload, ) logger = CustomLogger("ddpui") def validate_operation_config( - payload: CreateDbtModelPayload, + payload: Union[CreateDbtModelPayload, EditDbtOperationPayload], target_model: OrgDbtModel, is_multi_input_op: bool, current_operations_chained: int = 0, From 0bd61e840d5064fa38d9f9fe76df6ddcbea3f902 Mon Sep 17 00:00:00 2001 From: Ishankoradia Date: Tue, 26 Mar 2024 13:53:43 +0530 Subject: [PATCH 4/4] get api to fetch config of an operation --- ddpui/api/transform_api.py | 38 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/ddpui/api/transform_api.py b/ddpui/api/transform_api.py index 679cee59..ff31a77c 100644 --- a/ddpui/api/transform_api.py +++ b/ddpui/api/transform_api.py @@ -328,6 +328,44 @@ def put_operation(request, operation_uuid: str, payload: EditDbtOperationPayload } +@transformapi.get( + "/dbt_project/model/operations/{operation_uuid}/", auth=auth.CanManagePipelines() +) +def get_operation(request, operation_uuid: str): + """ + Fetch config of operation + """ + + orguser: OrgUser = request.orguser + org = orguser.org + + org_warehouse = OrgWarehouse.objects.filter(org=org).first() + if not org_warehouse: + raise HttpError(404, "please setup your warehouse first") + + # make sure the orgdbt here is the one we create locally + orgdbt = OrgDbt.objects.filter(org=org, gitrepo_url=None).first() + if not orgdbt: + raise HttpError(404, "dbt workspace not setup") + + try: + uuid.UUID(str(operation_uuid)) + except ValueError: + raise HttpError(400, "operation not found") + + dbt_operation = OrgDbtOperation.objects.filter(uuid=operation_uuid).first() + if not dbt_operation: + raise HttpError(404, "operation not found") + + return { + "id": dbt_operation.uuid, + "output_cols": dbt_operation.output_cols, + "config": dbt_operation.config, + "type": "operation_node", + "target_model_id": dbt_operation.dbtmodel.uuid, + } + + @transformapi.post( "/dbt_project/model/{model_uuid}/save/", auth=auth.CanManagePipelines() )