Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

endpoints for orgtasks on transform page #446

Merged
merged 6 commits into from
Feb 5, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 67 additions & 14 deletions ddpui/api/orgtask_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from ninja import NinjaAPI
from ninja.errors import HttpError
from django.forms.models import model_to_dict

from ninja.errors import ValidationError
from ninja.responses import Response
Expand All @@ -23,12 +24,16 @@
OrgPrefectBlockv1,
)
from ddpui.models.org_user import OrgUser
from ddpui.models.tasks import DataflowOrgTask, OrgTask, TaskLock
from ddpui.models.tasks import DataflowOrgTask, OrgTask, TaskLock, Task
from ddpui.ddpprefect.schema import (
PrefectSecretBlockCreate,
)
from ddpui.schemas.org_task_schema import CreateOrgTaskPayload
from ddpui.core.dbtfunctions import gather_dbt_project_params
from ddpui.core.orgtaskfunctions import create_transform_tasks
from ddpui.core.orgtaskfunctions import (
create_transform_tasks,
create_prefect_deployment_for_dbtcore_task,
)
from ddpui.utils.custom_logger import CustomLogger
from ddpui.utils import secretsmanager
from ddpui.utils import timezone
Expand Down Expand Up @@ -80,6 +85,57 @@
return Response({"detail": "something went wrong"}, status=500)


@orgtaskapi.post("/", auth=auth.CanManagePipelines())
def post_org_task(request, payload: CreateOrgTaskPayload):
"""Create a custom client org task (dbt or git). If base task is dbt run create a deployment"""
orguser: OrgUser = request.orguser
if orguser.org.dbt is None:
raise HttpError(400, "create a dbt workspace first")

Check warning on line 93 in ddpui/api/orgtask_api.py

View check run for this annotation

Codecov / codecov/patch

ddpui/api/orgtask_api.py#L91-L93

Added lines #L91 - L93 were not covered by tests

task = Task.objects.filter(slug=payload.task_slug).first()

Check warning on line 95 in ddpui/api/orgtask_api.py

View check run for this annotation

Codecov / codecov/patch

ddpui/api/orgtask_api.py#L95

Added line #L95 was not covered by tests

if task is None:
raise HttpError(404, "task not found")

Check warning on line 98 in ddpui/api/orgtask_api.py

View check run for this annotation

Codecov / codecov/patch

ddpui/api/orgtask_api.py#L97-L98

Added lines #L97 - L98 were not covered by tests

parameters = {}
if payload.flags and len(payload.flags) > 0:
parameters["flags"] = payload.flags

Check warning on line 102 in ddpui/api/orgtask_api.py

View check run for this annotation

Codecov / codecov/patch

ddpui/api/orgtask_api.py#L100-L102

Added lines #L100 - L102 were not covered by tests

if payload.options and len(payload.options.keys()) > 0:
parameters["options"] = payload.options

Check warning on line 105 in ddpui/api/orgtask_api.py

View check run for this annotation

Codecov / codecov/patch

ddpui/api/orgtask_api.py#L104-L105

Added lines #L104 - L105 were not covered by tests

# create a deployment if the task type is run
orgtask = OrgTask.objects.create(

Check warning on line 108 in ddpui/api/orgtask_api.py

View check run for this annotation

Codecov / codecov/patch

ddpui/api/orgtask_api.py#L108

Added line #L108 was not covered by tests
org=orguser.org, task=task, parameters=parameters, generated_by="client"
)

dataflow = None
if task.slug == TASK_DBTRUN:
dbt_project_params, error = gather_dbt_project_params(orguser.org)
if error:
raise HttpError(400, error)

Check warning on line 116 in ddpui/api/orgtask_api.py

View check run for this annotation

Codecov / codecov/patch

ddpui/api/orgtask_api.py#L112-L116

Added lines #L112 - L116 were not covered by tests

# fetch the cli profile block
cli_profile_block = OrgPrefectBlockv1.objects.filter(

Check warning on line 119 in ddpui/api/orgtask_api.py

View check run for this annotation

Codecov / codecov/patch

ddpui/api/orgtask_api.py#L119

Added line #L119 was not covered by tests
org=orguser.org, block_type=DBTCLIPROFILE
).first()

if cli_profile_block is None:
raise HttpError(400, "dbt cli profile block not found")

Check warning on line 124 in ddpui/api/orgtask_api.py

View check run for this annotation

Codecov / codecov/patch

ddpui/api/orgtask_api.py#L123-L124

Added lines #L123 - L124 were not covered by tests

dataflow = create_prefect_deployment_for_dbtcore_task(

Check warning on line 126 in ddpui/api/orgtask_api.py

View check run for this annotation

Codecov / codecov/patch

ddpui/api/orgtask_api.py#L126

Added line #L126 was not covered by tests
orgtask, cli_profile_block, dbt_project_params
)

return {

Check warning on line 130 in ddpui/api/orgtask_api.py

View check run for this annotation

Codecov / codecov/patch

ddpui/api/orgtask_api.py#L130

Added line #L130 was not covered by tests
**model_to_dict(orgtask, fields=["parameters"]),
"task_slug": orgtask.task.slug,
"dataflow": (
{**model_to_dict(dataflow, exclude=["id", "org"])} if dataflow else None
),
}


@orgtaskapi.post("transform/", auth=auth.CanManagePipelines())
def post_prefect_transformation_tasks(request):
"""
Expand Down Expand Up @@ -213,26 +269,23 @@
# check if task is locked
lock = TaskLock.objects.filter(orgtask=org_task).first()

if org_task.task.type == "git":
command = "git " + org_task.get_task_parameters()
elif org_task.task.type == "dbt":
command = "dbt " + org_task.get_task_parameters()

# "git "/"dbt " + "run --full-refresh"/"pull"
command = org_task.task.type + " " + org_task.get_task_parameters()
command = org_task.task.type + " " + org_task.get_task_parameters()

org_tasks.append(
{
"label": org_task.task.label,
"slug": org_task.task.slug,
"id": org_task.id,
"deploymentId": None,
"lock": {
"lockedBy": lock.locked_by.user.email,
"lockedAt": lock.locked_at,
}
if lock
else None,
"lock": (
{
"lockedBy": lock.locked_by.user.email,
"lockedAt": lock.locked_at,
}
if lock
else None
),
"command": command,
}
)
Expand Down
84 changes: 48 additions & 36 deletions ddpui/core/orgtaskfunctions.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,44 +30,56 @@

if task.slug == TASK_DBTRUN:
# create deployment
hash_code = generate_hash_id(8)
deployment_name = f"manual-{org.slug}-{task.slug}-{hash_code}"
dataflow = prefect_service.create_dataflow_v1(
PrefectDataFlowCreateSchema3(
deployment_name=deployment_name,
flow_name=deployment_name,
orgslug=org.slug,
deployment_params={
"config": {
"tasks": [
setup_dbt_core_task_config(
org_task, cli_profile_block, dbt_project_params
).to_json()
],
"org_slug": org.slug,
}
},
)
create_prefect_deployment_for_dbtcore_task(
org_task, cli_profile_block, dbt_project_params
)

# store deployment record in django db
existing_dataflow = OrgDataFlowv1.objects.filter(
deployment_id=dataflow["deployment"]["id"]
).first()
if existing_dataflow:
existing_dataflow.delete()
return None, None

new_dataflow = OrgDataFlowv1.objects.create(
org=org,
name=deployment_name,
deployment_name=dataflow["deployment"]["name"],
deployment_id=dataflow["deployment"]["id"],
dataflow_type="manual",
)

DataflowOrgTask.objects.create(
dataflow=new_dataflow,
orgtask=org_task,
)
def create_prefect_deployment_for_dbtcore_task(
org_task: OrgTask,
cli_profile_block: OrgPrefectBlockv1,
dbt_project_params: DbtProjectParams,
):
hash_code = generate_hash_id(8)
deployment_name = f"manual-{org_task.org.slug}-{org_task.task.slug}-{hash_code}"
dataflow = prefect_service.create_dataflow_v1(
PrefectDataFlowCreateSchema3(
deployment_name=deployment_name,
flow_name=deployment_name,
orgslug=org_task.org.slug,
deployment_params={
"config": {
"tasks": [
setup_dbt_core_task_config(
org_task, cli_profile_block, dbt_project_params
).to_json()
],
"org_slug": org_task.org.slug,
}
},
)
)

return None, None
# store deployment record in django db
existing_dataflow = OrgDataFlowv1.objects.filter(
deployment_id=dataflow["deployment"]["id"]
).first()
if existing_dataflow:
existing_dataflow.delete()

Check warning on line 70 in ddpui/core/orgtaskfunctions.py

View check run for this annotation

Codecov / codecov/patch

ddpui/core/orgtaskfunctions.py#L70

Added line #L70 was not covered by tests

new_dataflow = OrgDataFlowv1.objects.create(
org=org_task.org,
name=deployment_name,
deployment_name=dataflow["deployment"]["name"],
deployment_id=dataflow["deployment"]["id"],
dataflow_type="manual",
)

DataflowOrgTask.objects.create(
dataflow=new_dataflow,
orgtask=org_task,
)

return new_dataflow
Empty file added ddpui/schemas/__init__.py
Empty file.
11 changes: 11 additions & 0 deletions ddpui/schemas/org_task_schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from ninja import Schema, Field


class CreateOrgTaskPayload(Schema):
"""
schema to define the payload required to create a custom org task
"""

task_slug: str
flags: list | None
options: dict | None
Loading