Skip to content

Commit

Permalink
Merge pull request #800 from DalgoT4D/bug-deploymentids-for-orgtask
Browse files Browse the repository at this point in the history
fixed the bug for fetching right dataflow against manual orgtask and also N+1 query
  • Loading branch information
fatchat authored Aug 8, 2024
2 parents 88d9b85 + 976dbbb commit 2f75552
Showing 1 changed file with 33 additions and 17 deletions.
50 changes: 33 additions & 17 deletions ddpui/api/orgtask_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
create_prefect_deployment_for_dbtcore_task,
delete_orgtask,
fetch_orgtask_lock,
fetch_orgtask_lock_v1,
)
from ddpui.utils.custom_logger import CustomLogger
from ddpui.utils import secretsmanager
Expand Down Expand Up @@ -293,46 +294,61 @@ def get_prefect_transformation_tasks(request):
"""Fetch all dbt tasks for an org; client or system"""
orguser: OrgUser = request.orguser

org_tasks = []

for org_task in (
org_tasks = (
OrgTask.objects.filter(
org=orguser.org,
task__type__in=["git", "dbt"],
)
.order_by("-generated_by")
.select_related("task")
.prefetch_related(
Prefetch("tasklock", queryset=TaskLock.objects.all()),
Prefetch("orgtaskdataflows", queryset=DataflowOrgTask.objects.all()),
)
):
)

all_org_task_ids = org_tasks.values_list("id", flat=True)
all_org_task_locks = TaskLock.objects.filter(orgtask_id__in=all_org_task_ids)

all_dataflow_orgtasks = DataflowOrgTask.objects.filter(
orgtask_id__in=all_org_task_ids, dataflow__dataflow_type="manual"
).select_related("dataflow")

res = []

for org_task in org_tasks:
# git pull : "git" + " " + "pull"
# dbt run --full-refresh : "dbt" + " " + "run --full-refresh"
command = org_task.task.type + " " + org_task.get_task_parameters()

org_tasks.append(
lock = None
all_locks = [
lock for lock in all_org_task_locks if lock.orgtask_id == org_task.id
]
if len(all_locks) > 0:
lock = all_locks[0]

res.append(
{
"label": org_task.task.label,
"slug": org_task.task.slug,
"id": org_task.id,
"uuid": org_task.uuid,
"deploymentId": None,
"lock": fetch_orgtask_lock(org_task),
"lock": fetch_orgtask_lock_v1(org_task, lock),
"command": command,
"generated_by": org_task.generated_by,
"seq": TRANSFORM_TASKS_SEQ[org_task.task.slug],
}
)

# fetch the manual deploymentId for the dbt run task
if org_task.task.slug == TASK_DBTRUN:
dataflow_orgtask = org_task.orgtaskdataflows.first()
org_tasks[-1]["deploymentId"] = (
dataflow_orgtask.dataflow.deployment_id if dataflow_orgtask else None
)
# fetch the manual deploymentId for the long running dbt tasks
dataflow_orgtasks = [
dfot for dfot in all_dataflow_orgtasks if dfot.orgtask_id == org_task.id
]
res[-1]["deploymentId"] = (
dataflow_orgtasks[0].dataflow.deployment_id
if len(dataflow_orgtasks) > 0
else None
)

return org_tasks
return res


@orgtaskapi.delete("transform/", auth=auth.CustomAuthMiddleware())
Expand Down

0 comments on commit 2f75552

Please sign in to comment.