Skip to content

Commit

Permalink
Merge pull request #524 from DalgoT4D/edit-operations
Browse files Browse the repository at this point in the history
Edit operations
  • Loading branch information
fatchat authored Mar 28, 2024
2 parents c31520a + 0bd61e8 commit c556984
Show file tree
Hide file tree
Showing 3 changed files with 257 additions and 85 deletions.
224 changes: 139 additions & 85 deletions ddpui/api/transform_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
from ddpui.schemas.dbt_workflow_schema import (
CreateDbtModelPayload,
CompleteDbtModelPayload,
EditDbtOperationPayload,
)
from ddpui.core.transformfunctions import validate_operation_config

from ddpui.core import dbtautomation_service
from ddpui.core.dbtautomation_service import sync_sources_for_warehouse
Expand Down Expand Up @@ -196,102 +198,29 @@ def post_construct_dbt_model_operation(request, payload: CreateDbtModelPayload):
dbtmodel=target_model
).count()

primary_input_model: OrgDbtModel = None # the first input model
other_input_models: list[OrgDbtModel] = []
seq: list[int] = []
other_input_columns: list[list[str]] = []

logger.info(
f"Operations chained for the target model {target_model.uuid} : {current_operations_chained}"
final_config, all_input_models = validate_operation_config(
payload, target_model, is_multi_input_op, current_operations_chained
)

if current_operations_chained == 0:
if not payload.input_uuid:
raise HttpError(422, "input is required")

model = OrgDbtModel.objects.filter(uuid=payload.input_uuid).first()
if not model:
raise HttpError(404, "input not found")

primary_input_model = model

if is_multi_input_op: # multi input operation
if len(payload.other_inputs) == 0:
raise HttpError(422, "atleast 2 inputs are required for this operation")

payload.other_inputs.sort(key=lambda x: x.seq)

for other_input in payload.other_inputs:
model = OrgDbtModel.objects.filter(uuid=other_input.uuid).first()
if not model:
raise HttpError(404, "input not found")
seq.append(other_input.seq)
other_input_columns.append(other_input.columns)
other_input_models.append(model)

all_input_models = (
[primary_input_model] if primary_input_model else []
) + other_input_models

# we create edges only with tables/models at the start of the chain & not operation nodes
if current_operations_chained == 0:
for source in all_input_models:
edge = DbtEdge.objects.filter(
from_node=source, to_node=target_model
).first()
if not edge:
DbtEdge.objects.create(
from_node=source,
to_node=target_model,
)

logger.info("passed all validation; moving to create operation")

# source columns or selected columns
# there will be atleast one input
OP_CONFIG = payload.config
OP_CONFIG["source_columns"] = payload.source_columns
OP_CONFIG["other_inputs"] = []

# in case of mutli input; send the rest of the inputs in the config; dbt_automation will handle the rest
for dbtmodel, seq, columns in zip(other_input_models, seq, other_input_columns):
OP_CONFIG["other_inputs"].append(
{
"input": {
"input_type": dbtmodel.type,
"input_name": dbtmodel.name,
"source_name": dbtmodel.source_name,
},
"source_columns": columns,
"seq": seq,
}
)
# we create edges only with tables/models
for source in all_input_models:
edge = DbtEdge.objects.filter(from_node=source, to_node=target_model).first()
if not edge:
DbtEdge.objects.create(
from_node=source,
to_node=target_model,
)

input_config = {
"config": OP_CONFIG,
"type": payload.op_type,
"input_models": [
{
"uuid": str(model.uuid),
"name": model.name,
"display_name": model.display_name,
"source_name": model.source_name,
"schema": model.schema,
"type": model.type,
}
for model in all_input_models
],
}
output_cols = dbtautomation_service.get_output_cols_for_operation(
org_warehouse, payload.op_type, OP_CONFIG.copy()
org_warehouse, payload.op_type, final_config["config"].copy()
)
logger.info("creating operation")

dbt_op = OrgDbtOperation.objects.create(
dbtmodel=target_model,
uuid=uuid.uuid4(),
seq=current_operations_chained + 1,
config=input_config,
config=final_config,
output_cols=output_cols,
)

Expand All @@ -312,6 +241,131 @@ def post_construct_dbt_model_operation(request, payload: CreateDbtModelPayload):
}


@transformapi.put(
"/dbt_project/model/operations/{operation_uuid}/", auth=auth.CanManagePipelines()
)
def put_operation(request, operation_uuid: str, payload: EditDbtOperationPayload):
"""
Update operation config
"""
orguser: OrgUser = request.orguser
org = orguser.org

is_multi_input_op = payload.op_type in ["join", "unionall"]

org_warehouse = OrgWarehouse.objects.filter(org=org).first()
if not org_warehouse:
raise HttpError(404, "please setup your warehouse first")

# make sure the orgdbt here is the one we create locally
orgdbt = OrgDbt.objects.filter(org=org, gitrepo_url=None).first()
if not orgdbt:
raise HttpError(404, "dbt workspace not setup")

try:
uuid.UUID(str(operation_uuid))
except ValueError:
raise HttpError(400, "operation not found")

dbt_operation = OrgDbtOperation.objects.filter(uuid=operation_uuid).first()
if not dbt_operation:
raise HttpError(404, "operation not found")

if dbt_operation.dbtmodel.under_construction is False:
raise HttpError(403, "model is locked")

# allow edit of only leaf operation nodes
if (
OrgDbtOperation.objects.filter(
dbtmodel=dbt_operation.dbtmodel, seq__gt=dbt_operation.seq
).count()
>= 1
):
raise HttpError(403, "operation is locked; cannot edit")

target_model = dbt_operation.dbtmodel

current_operations_chained = OrgDbtOperation.objects.filter(
dbtmodel=target_model
).count()

final_config, all_input_models = validate_operation_config(
payload, target_model, is_multi_input_op, current_operations_chained, edit=True
)

# create edges only with tables/models if not present
for source in all_input_models:
edge = DbtEdge.objects.filter(from_node=source, to_node=target_model).first()
if not edge:
DbtEdge.objects.create(
from_node=source,
to_node=target_model,
)

output_cols = dbtautomation_service.get_output_cols_for_operation(
org_warehouse, payload.op_type, final_config["config"].copy()
)
logger.info("updating operation")

dbt_operation.config = final_config
dbt_operation.output_cols = output_cols
dbt_operation.save()

logger.info("updated operation")

# save the output cols of the latest operation to the dbt model
target_model.output_cols = dbt_operation.output_cols
target_model.save()

logger.info("updated output cols for the target model")

return {
"id": dbt_operation.uuid,
"output_cols": dbt_operation.output_cols,
"config": dbt_operation.config,
"type": "operation_node",
"target_model_id": dbt_operation.dbtmodel.uuid,
}


@transformapi.get(
"/dbt_project/model/operations/{operation_uuid}/", auth=auth.CanManagePipelines()
)
def get_operation(request, operation_uuid: str):
"""
Fetch config of operation
"""

orguser: OrgUser = request.orguser
org = orguser.org

org_warehouse = OrgWarehouse.objects.filter(org=org).first()
if not org_warehouse:
raise HttpError(404, "please setup your warehouse first")

# make sure the orgdbt here is the one we create locally
orgdbt = OrgDbt.objects.filter(org=org, gitrepo_url=None).first()
if not orgdbt:
raise HttpError(404, "dbt workspace not setup")

try:
uuid.UUID(str(operation_uuid))
except ValueError:
raise HttpError(400, "operation not found")

dbt_operation = OrgDbtOperation.objects.filter(uuid=operation_uuid).first()
if not dbt_operation:
raise HttpError(404, "operation not found")

return {
"id": dbt_operation.uuid,
"output_cols": dbt_operation.output_cols,
"config": dbt_operation.config,
"type": "operation_node",
"target_model_id": dbt_operation.dbtmodel.uuid,
}


@transformapi.post(
"/dbt_project/model/{model_uuid}/save/", auth=auth.CanManagePipelines()
)
Expand Down
106 changes: 106 additions & 0 deletions ddpui/core/transformfunctions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
from typing import Union
from ninja.errors import HttpError
from ddpui.models.dbt_workflow import OrgDbtModel, DbtEdge, OrgDbtOperation
from ddpui.utils.custom_logger import CustomLogger
from ddpui.schemas.dbt_workflow_schema import (
CreateDbtModelPayload,
EditDbtOperationPayload,
)

logger = CustomLogger("ddpui")


def validate_operation_config(
payload: Union[CreateDbtModelPayload, EditDbtOperationPayload],
target_model: OrgDbtModel,
is_multi_input_op: bool,
current_operations_chained: int = 0,
edit: bool = False,
):
"""
- Validate if the operation config has correct input(s)
- Raises http errors if validation fails
- Return ordered list of input models and the final config of operation
return [final_config, all_input_models]
"""

if edit:
current_operations_chained -= 1

primary_input_model: OrgDbtModel = None # the first input model
other_input_models: list[OrgDbtModel] = []
seq: list[int] = []
other_input_columns: list[list[str]] = []

logger.info(
f"Operations chained for the target model {target_model.uuid} : {current_operations_chained}"
)

if current_operations_chained == 0:
if not payload.input_uuid:
raise HttpError(422, "input is required")

model = OrgDbtModel.objects.filter(uuid=payload.input_uuid).first()
if not model:
raise HttpError(404, "input not found")

primary_input_model = model

if is_multi_input_op: # multi input operation
if len(payload.other_inputs) == 0:
raise HttpError(422, "atleast 2 inputs are required for this operation")

payload.other_inputs.sort(key=lambda x: x.seq)

for other_input in payload.other_inputs:
model = OrgDbtModel.objects.filter(uuid=other_input.uuid).first()
if not model:
raise HttpError(404, "input not found")
seq.append(other_input.seq)
other_input_columns.append(other_input.columns)
other_input_models.append(model)

all_input_models = (
[primary_input_model] if primary_input_model else []
) + other_input_models

logger.info("passed all validation; moving to create operation")

# source columns or selected columns
# there will be atleast one input
OP_CONFIG = payload.config
OP_CONFIG["source_columns"] = payload.source_columns
OP_CONFIG["other_inputs"] = []

# in case of mutli input; send the rest of the inputs in the config; dbt_automation will handle the rest
for dbtmodel, seq, columns in zip(other_input_models, seq, other_input_columns):
OP_CONFIG["other_inputs"].append(
{
"input": {
"input_type": dbtmodel.type,
"input_name": dbtmodel.name,
"source_name": dbtmodel.source_name,
},
"source_columns": columns,
"seq": seq,
}
)

input_config = {
"config": OP_CONFIG,
"type": payload.op_type,
"input_models": [
{
"uuid": str(model.uuid),
"name": model.name,
"display_name": model.display_name,
"source_name": model.source_name,
"schema": model.schema,
"type": model.type,
}
for model in all_input_models
],
}

return (input_config, all_input_models)
12 changes: 12 additions & 0 deletions ddpui/schemas/dbt_workflow_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,18 @@ class CreateDbtModelPayload(Schema):
other_inputs: list[InputModelPayload] = []


class EditDbtOperationPayload(Schema):
"""
schema to define the payload required to edit a dbt operation
"""

config: dict
op_type: str
input_uuid: str = ""
source_columns: list[str] = []
other_inputs: list[InputModelPayload] = []


class CompleteDbtModelPayload(Schema):
"""
schema to define the payload required to create a custom org task
Expand Down

0 comments on commit c556984

Please sign in to comment.