Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

endpoints for orgtasks on transform page #446

Merged
merged 6 commits into from
Feb 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
173 changes: 132 additions & 41 deletions ddpui/api/orgtask_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from ninja import NinjaAPI
from ninja.errors import HttpError
from django.forms.models import model_to_dict

from ninja.errors import ValidationError
from ninja.responses import Response
Expand All @@ -23,12 +24,23 @@
OrgPrefectBlockv1,
)
from ddpui.models.org_user import OrgUser
from ddpui.models.tasks import DataflowOrgTask, OrgTask, TaskLock
from ddpui.models.tasks import (
DataflowOrgTask,
OrgTask,
TaskLock,
Task,
OrgTaskGeneratedBy,
)
from ddpui.ddpprefect.schema import (
PrefectSecretBlockCreate,
)
from ddpui.schemas.org_task_schema import CreateOrgTaskPayload
from ddpui.core.dbtfunctions import gather_dbt_project_params
from ddpui.core.orgtaskfunctions import create_transform_tasks
from ddpui.core.orgtaskfunctions import (
create_default_transform_tasks,
create_prefect_deployment_for_dbtcore_task,
delete_orgtask,
)
from ddpui.utils.custom_logger import CustomLogger
from ddpui.utils import secretsmanager
from ddpui.utils import timezone
Expand Down Expand Up @@ -80,8 +92,59 @@
return Response({"detail": "something went wrong"}, status=500)


@orgtaskapi.post("/", auth=auth.CanManagePipelines())
def post_orgtask(request, payload: CreateOrgTaskPayload):
"""Create a custom client org task (dbt or git). If base task is dbt run create a deployment"""
orguser: OrgUser = request.orguser
if orguser.org.dbt is None:
raise HttpError(400, "create a dbt workspace first")

Check warning on line 100 in ddpui/api/orgtask_api.py

View check run for this annotation

Codecov / codecov/patch

ddpui/api/orgtask_api.py#L98-L100

Added lines #L98 - L100 were not covered by tests

task = Task.objects.filter(slug=payload.task_slug).first()

Check warning on line 102 in ddpui/api/orgtask_api.py

View check run for this annotation

Codecov / codecov/patch

ddpui/api/orgtask_api.py#L102

Added line #L102 was not covered by tests

if task is None:
raise HttpError(404, "task not found")

Check warning on line 105 in ddpui/api/orgtask_api.py

View check run for this annotation

Codecov / codecov/patch

ddpui/api/orgtask_api.py#L104-L105

Added lines #L104 - L105 were not covered by tests

parameters = {}
if payload.flags and len(payload.flags) > 0:
parameters["flags"] = payload.flags

Check warning on line 109 in ddpui/api/orgtask_api.py

View check run for this annotation

Codecov / codecov/patch

ddpui/api/orgtask_api.py#L107-L109

Added lines #L107 - L109 were not covered by tests

if payload.options and len(payload.options.keys()) > 0:
parameters["options"] = payload.options

Check warning on line 112 in ddpui/api/orgtask_api.py

View check run for this annotation

Codecov / codecov/patch

ddpui/api/orgtask_api.py#L111-L112

Added lines #L111 - L112 were not covered by tests

# create a deployment if the task type is run
orgtask = OrgTask.objects.create(

Check warning on line 115 in ddpui/api/orgtask_api.py

View check run for this annotation

Codecov / codecov/patch

ddpui/api/orgtask_api.py#L115

Added line #L115 was not covered by tests
org=orguser.org, task=task, parameters=parameters, generated_by="client"
)

dataflow = None
if task.slug == TASK_DBTRUN:
dbt_project_params, error = gather_dbt_project_params(orguser.org)
if error:
raise HttpError(400, error)

Check warning on line 123 in ddpui/api/orgtask_api.py

View check run for this annotation

Codecov / codecov/patch

ddpui/api/orgtask_api.py#L119-L123

Added lines #L119 - L123 were not covered by tests

# fetch the cli profile block
cli_profile_block = OrgPrefectBlockv1.objects.filter(

Check warning on line 126 in ddpui/api/orgtask_api.py

View check run for this annotation

Codecov / codecov/patch

ddpui/api/orgtask_api.py#L126

Added line #L126 was not covered by tests
org=orguser.org, block_type=DBTCLIPROFILE
).first()

if cli_profile_block is None:
raise HttpError(400, "dbt cli profile block not found")

Check warning on line 131 in ddpui/api/orgtask_api.py

View check run for this annotation

Codecov / codecov/patch

ddpui/api/orgtask_api.py#L130-L131

Added lines #L130 - L131 were not covered by tests

dataflow = create_prefect_deployment_for_dbtcore_task(

Check warning on line 133 in ddpui/api/orgtask_api.py

View check run for this annotation

Codecov / codecov/patch

ddpui/api/orgtask_api.py#L133

Added line #L133 was not covered by tests
orgtask, cli_profile_block, dbt_project_params
)

return {

Check warning on line 137 in ddpui/api/orgtask_api.py

View check run for this annotation

Codecov / codecov/patch

ddpui/api/orgtask_api.py#L137

Added line #L137 was not covered by tests
**model_to_dict(orgtask, fields=["parameters"]),
"task_slug": orgtask.task.slug,
"dataflow": (
{**model_to_dict(dataflow, exclude=["id", "org"])} if dataflow else None
),
}


@orgtaskapi.post("transform/", auth=auth.CanManagePipelines())
def post_prefect_transformation_tasks(request):
def post_system_transformation_tasks(request):
"""
- Create a git pull url secret block
- Create a dbt cli profile block
Expand Down Expand Up @@ -186,7 +249,7 @@
raise HttpError(400, str(error)) from error

# create org tasks for the transformation page
_, error = create_transform_tasks(
_, error = create_default_transform_tasks(
orguser.org, cli_profile_block, dbt_project_params
)
if error:
Expand All @@ -197,7 +260,7 @@

@orgtaskapi.get("transform/", auth=auth.CanManagePipelines())
def get_prefect_transformation_tasks(request):
"""Fetch all dbt tasks for an org"""
"""Fetch all dbt tasks for an org; client or system"""
orguser: OrgUser = request.orguser

org_tasks = []
Expand All @@ -207,33 +270,32 @@
org=orguser.org,
task__type__in=["git", "dbt"],
)
.order_by("task__id")
.order_by("-generated_by")
.all()
):
# check if task is locked
lock = TaskLock.objects.filter(orgtask=org_task).first()

if org_task.task.type == "git":
command = "git " + org_task.get_task_parameters()
elif org_task.task.type == "dbt":
command = "dbt " + org_task.get_task_parameters()

# "git "/"dbt " + "run --full-refresh"/"pull"
command = org_task.task.type + " " + org_task.get_task_parameters()
# git pull : "git" + " " + "pull"
# dbt run --full-refresh : "dbt" + " " + "run --full-refresh"
command = org_task.task.type + " " + org_task.get_task_parameters()

org_tasks.append(
{
"label": org_task.task.label,
"slug": org_task.task.slug,
"id": org_task.id,
"deploymentId": None,
"lock": {
"lockedBy": lock.locked_by.user.email,
"lockedAt": lock.locked_at,
}
if lock
else None,
"lock": (
{
"lockedBy": lock.locked_by.user.email,
"lockedAt": lock.locked_at,
}
if lock
else None
),
"command": command,
"generated_by": org_task.generated_by,
}
)

Expand All @@ -248,7 +310,7 @@


@orgtaskapi.delete("transform/", auth=auth.CanManagePipelines())
def delete_prefect_transformation_tasks(request):
def delete_system_transformation_tasks(request):
"""delete tasks and related objects for an org"""
orguser: OrgUser = request.orguser

Expand All @@ -270,28 +332,14 @@
prefect_service.delete_dbt_cli_profile_block(cli_profile_block.block_id)
cli_profile_block.delete()

org_tasks = OrgTask.objects.filter(org=orguser.org).all()
for org_task in OrgTask.objects.filter(org=orguser.org, task__is_system=True).all():
_, error = delete_orgtask(org_task)

for org_task in org_tasks:
if org_task.task.slug == TASK_DBTRUN:
dataflow_orgtask = DataflowOrgTask.objects.filter(orgtask=org_task).first()
if dataflow_orgtask:
# delete the manual deployment for this
dataflow = dataflow_orgtask.dataflow
logger.info("deleting manual deployment for dbt run")
# do this in try catch because it can fail & throw error
try:
prefect_service.delete_deployment_by_id(dataflow.deployment_id)
except Exception:
pass
logger.info("FINISHED deleting manual deployment for dbt run")
logger.info("deleting OrgDataFlowv1")
dataflow.delete()
logger.info("deleting DataflowOrgTask")
dataflow_orgtask.delete()

logger.info("deleting org task %s", org_task.task.slug)
org_task.delete()
if error:
logger.info(

Check warning on line 339 in ddpui/api/orgtask_api.py

View check run for this annotation

Codecov / codecov/patch

ddpui/api/orgtask_api.py#L339

Added line #L339 was not covered by tests
f"Failed deleting orgtask with id {org_task.id} of type {org_task.task.slug}. Skipping and continuing to next task deletion"
)
continue

Check warning on line 342 in ddpui/api/orgtask_api.py

View check run for this annotation

Codecov / codecov/patch

ddpui/api/orgtask_api.py#L342

Added line #L342 was not covered by tests


@orgtaskapi.post("{orgtask_id}/run/", auth=auth.CanManagePipelines())
Expand Down Expand Up @@ -390,3 +438,46 @@
logger.info("released lock on task %s", org_task.task.slug)

return result


@orgtaskapi.delete("{orgtask_id}/", auth=auth.CanManagePipelines())
def post_delete_orgtask(request, orgtask_id): # pylint: disable=unused-argument
"""Delete client generated orgtask"""

orguser: OrgUser = request.orguser

Check warning on line 447 in ddpui/api/orgtask_api.py

View check run for this annotation

Codecov / codecov/patch

ddpui/api/orgtask_api.py#L447

Added line #L447 was not covered by tests

org_task = OrgTask.objects.filter(org=orguser.org, id=orgtask_id).first()

Check warning on line 449 in ddpui/api/orgtask_api.py

View check run for this annotation

Codecov / codecov/patch

ddpui/api/orgtask_api.py#L449

Added line #L449 was not covered by tests

if org_task is None:
raise HttpError(400, "task not found")

Check warning on line 452 in ddpui/api/orgtask_api.py

View check run for this annotation

Codecov / codecov/patch

ddpui/api/orgtask_api.py#L451-L452

Added lines #L451 - L452 were not covered by tests

if org_task.task.type not in ["dbt", "git"]:
raise HttpError(400, "task not supported")

Check warning on line 455 in ddpui/api/orgtask_api.py

View check run for this annotation

Codecov / codecov/patch

ddpui/api/orgtask_api.py#L454-L455

Added lines #L454 - L455 were not covered by tests

if orguser.org.dbt is None:
raise HttpError(400, "dbt is not configured for this client")

Check warning on line 458 in ddpui/api/orgtask_api.py

View check run for this annotation

Codecov / codecov/patch

ddpui/api/orgtask_api.py#L457-L458

Added lines #L457 - L458 were not covered by tests

if org_task.generated_by == OrgTaskGeneratedBy.SYSTEM:
raise HttpError(400, "cannot delete system generated tasks")

Check warning on line 461 in ddpui/api/orgtask_api.py

View check run for this annotation

Codecov / codecov/patch

ddpui/api/orgtask_api.py#L460-L461

Added lines #L460 - L461 were not covered by tests

# check if the task is locked
task_lock = TaskLock.objects.filter(orgtask=org_task).first()
if task_lock:
raise HttpError(

Check warning on line 466 in ddpui/api/orgtask_api.py

View check run for this annotation

Codecov / codecov/patch

ddpui/api/orgtask_api.py#L464-L466

Added lines #L464 - L466 were not covered by tests
400,
f"Cannot delete, {task_lock.locked_by.user.email} is running this operation",
)

# make sure the org task is not part of a orchestrate pipeline
if DataflowOrgTask.objects.filter(orgtask=org_task, dataflow__dataflow_type='orchestrate').count() > 0:
raise HttpError(403, "Cannot delete the orgtask since its part of a pipeline")

Check warning on line 473 in ddpui/api/orgtask_api.py

View check run for this annotation

Codecov / codecov/patch

ddpui/api/orgtask_api.py#L472-L473

Added lines #L472 - L473 were not covered by tests

_, error = delete_orgtask(org_task)

Check warning on line 475 in ddpui/api/orgtask_api.py

View check run for this annotation

Codecov / codecov/patch

ddpui/api/orgtask_api.py#L475

Added line #L475 was not covered by tests

if error:
logger.info(

Check warning on line 478 in ddpui/api/orgtask_api.py

View check run for this annotation

Codecov / codecov/patch

ddpui/api/orgtask_api.py#L477-L478

Added lines #L477 - L478 were not covered by tests
f"Failed deleting orgtask with id {org_task.id} of type {org_task.task.slug}. Skipping and continuing to next task deletion"
)
raise HttpError(400, error)

Check warning on line 481 in ddpui/api/orgtask_api.py

View check run for this annotation

Codecov / codecov/patch

ddpui/api/orgtask_api.py#L481

Added line #L481 was not covered by tests

return {"success": 1}

Check warning on line 483 in ddpui/api/orgtask_api.py

View check run for this annotation

Codecov / codecov/patch

ddpui/api/orgtask_api.py#L483

Added line #L483 was not covered by tests
122 changes: 81 additions & 41 deletions ddpui/core/orgtaskfunctions.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,62 +12,102 @@
from ddpui.ddpdbt.schema import DbtProjectParams
from ddpui.ddpprefect import prefect_service
from ddpui.core.pipelinefunctions import setup_dbt_core_task_config
from ddpui.utils.constants import (
TASK_DBTRUN,
)
from ddpui.utils.constants import TASK_DBTRUN, TASK_AIRBYTESYNC
from ddpui.utils.helpers import generate_hash_id
from ddpui.ddpdbt.schema import DbtProjectParams

logger = CustomLogger("ddpui")


def create_transform_tasks(
def create_default_transform_tasks(
org: Org, cli_profile_block: OrgPrefectBlockv1, dbt_project_params: DbtProjectParams
):
"""Create all the transform (git, dbt) tasks"""
for task in Task.objects.filter(type__in=["dbt", "git"]).all():
for task in Task.objects.filter(type__in=["dbt", "git"], is_system=True).all():
org_task = OrgTask.objects.create(org=org, task=task)

if task.slug == TASK_DBTRUN:
# create deployment
hash_code = generate_hash_id(8)
deployment_name = f"manual-{org.slug}-{task.slug}-{hash_code}"
dataflow = prefect_service.create_dataflow_v1(
PrefectDataFlowCreateSchema3(
deployment_name=deployment_name,
flow_name=deployment_name,
orgslug=org.slug,
deployment_params={
"config": {
"tasks": [
setup_dbt_core_task_config(
org_task, cli_profile_block, dbt_project_params
).to_json()
],
"org_slug": org.slug,
}
},
)
create_prefect_deployment_for_dbtcore_task(
org_task, cli_profile_block, dbt_project_params
)

# store deployment record in django db
existing_dataflow = OrgDataFlowv1.objects.filter(
deployment_id=dataflow["deployment"]["id"]
).first()
if existing_dataflow:
existing_dataflow.delete()

new_dataflow = OrgDataFlowv1.objects.create(
org=org,
name=deployment_name,
deployment_name=dataflow["deployment"]["name"],
deployment_id=dataflow["deployment"]["id"],
dataflow_type="manual",
)
return None, None

DataflowOrgTask.objects.create(
dataflow=new_dataflow,
orgtask=org_task,
)

def create_prefect_deployment_for_dbtcore_task(
org_task: OrgTask,
cli_profile_block: OrgPrefectBlockv1,
dbt_project_params: DbtProjectParams,
):
hash_code = generate_hash_id(8)
deployment_name = f"manual-{org_task.org.slug}-{org_task.task.slug}-{hash_code}"
dataflow = prefect_service.create_dataflow_v1(
PrefectDataFlowCreateSchema3(
deployment_name=deployment_name,
flow_name=deployment_name,
orgslug=org_task.org.slug,
deployment_params={
"config": {
"tasks": [
setup_dbt_core_task_config(
org_task, cli_profile_block, dbt_project_params
).to_json()
],
"org_slug": org_task.org.slug,
}
},
)
)

# store deployment record in django db
existing_dataflow = OrgDataFlowv1.objects.filter(
deployment_id=dataflow["deployment"]["id"]
).first()
if existing_dataflow:
existing_dataflow.delete()

Check warning on line 68 in ddpui/core/orgtaskfunctions.py

View check run for this annotation

Codecov / codecov/patch

ddpui/core/orgtaskfunctions.py#L68

Added line #L68 was not covered by tests

new_dataflow = OrgDataFlowv1.objects.create(
org=org_task.org,
name=deployment_name,
deployment_name=dataflow["deployment"]["name"],
deployment_id=dataflow["deployment"]["id"],
dataflow_type="manual",
)

DataflowOrgTask.objects.create(
dataflow=new_dataflow,
orgtask=org_task,
)

return new_dataflow


def delete_orgtask(org_task: OrgTask):
"""Delete an orgtask; along with its deployment if its there"""

for dataflow_orgtask in DataflowOrgTask.objects.filter(
orgtask=org_task
).all(): # only long running task like TASK_DBTRUN, TASK_AIRBYTESYNC will have dataflow

# delete the manual deployment for this
dataflow = dataflow_orgtask.dataflow
if dataflow:
logger.info(f"deleting manual deployment for {org_task.task.slug}")

# do this in try catch because it can fail & throw error
try:
prefect_service.delete_deployment_by_id(dataflow.deployment_id)
except Exception:
pass

Check warning on line 102 in ddpui/core/orgtaskfunctions.py

View check run for this annotation

Codecov / codecov/patch

ddpui/core/orgtaskfunctions.py#L101-L102

Added lines #L101 - L102 were not covered by tests
logger.info("FINISHED deleting manual deployment for dbt run")
logger.info("deleting OrgDataFlowv1")
dataflow.delete()

logger.info("deleting DataflowOrgTask")
dataflow_orgtask.delete()

logger.info("deleting org task %s", org_task.task.slug)
org_task.delete()

return None, None
Empty file added ddpui/schemas/__init__.py
Empty file.
Loading
Loading