Skip to content

Commit

Permalink
Merge pull request #497 from DalgoT4D/495-update-sync-sources-api
Browse files Browse the repository at this point in the history
495 update sync sources api
  • Loading branch information
Ishankoradia authored Mar 7, 2024
2 parents 947a91e + c9e0d7a commit 41a9774
Show file tree
Hide file tree
Showing 5 changed files with 152 additions and 41 deletions.
39 changes: 6 additions & 33 deletions ddpui/api/transform_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@
from ddpui.schemas.org_task_schema import DbtProjectSchema
from ddpui.schemas.dbt_workflow_schema import (
CreateDbtModelPayload,
SyncSourcesSchema,
CompleteDbtModelPayload,
)

from ddpui.core import dbtautomation_service
from ddpui.core.dbtautomation_service import sync_sources_for_warehouse

transformapi = NinjaAPI(urls_namespace="transform")

Expand Down Expand Up @@ -127,7 +127,7 @@ def delete_dbt_project(request, project_name: str):


@transformapi.post("/dbt_project/sync_sources/", auth=auth.CanManagePipelines())
def sync_sources(request, payload: SyncSourcesSchema):
def sync_sources(request):
"""
Sync sources from a given schema.
"""
Expand All @@ -142,37 +142,9 @@ def sync_sources(request, payload: SyncSourcesSchema):
if not orgdbt:
raise HttpError(404, "DBT workspace not set up")

sources_file_path, error = dbtautomation_service.sync_sources_to_dbt(
payload.schema_name, payload.source_name, org, org_warehouse
)
task = sync_sources_for_warehouse.delay(orgdbt.id, org_warehouse.id)

if error:
raise HttpError(422, error)

# sync sources to django db
logger.info("synced sources in dbt, saving to db now")
sources = dbtautomation_service.read_dbt_sources_in_project(orgdbt)
for source in sources:
orgdbt_source = OrgDbtModel.objects.filter(
source_name=source["source_name"], name=source["input_name"], type="source"
).first()

if not orgdbt_source:
orgdbt_source = OrgDbtModel(
uuid=uuid.uuid4(),
orgdbt=orgdbt,
source_name=source["source_name"],
name=source["input_name"],
display_name=source["input_name"],
type="source",
)

orgdbt_source.schema = source["schema"]
orgdbt_source.sql_path = sources_file_path

orgdbt_source.save()

return {"sources_file_path": str(sources_file_path)}
return {"task_id": task.id}


########################## Models & Sources #############################################
Expand Down Expand Up @@ -460,6 +432,7 @@ def get_dbt_project_DAG(request):
"output_cols": node.output_cols,
"config": node.config,
"type": "operation_node",
"target_model_id": node.dbtmodel.uuid,
}
)

Expand Down Expand Up @@ -495,7 +468,7 @@ def delete_model(request, model_uuid):
orgdbt_model = OrgDbtModel.objects.filter(uuid=model_uuid).first()
if not orgdbt_model:
raise HttpError(404, "model not found")

operations = OrgDbtOperation.objects.filter(dbtmodel=orgdbt_model).count()

if operations > 0:
Expand Down
117 changes: 114 additions & 3 deletions ddpui/core/dbtautomation_service.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import os
import os, uuid
from pathlib import Path

from dbt_automation.operations.arithmetic import arithmetic, arithmetic_dbt_sql
Expand Down Expand Up @@ -31,15 +31,21 @@
merge_operations,
merge_operations_sql,
)
from dbt_automation.operations.syncsources import sync_sources
from dbt_automation.operations.syncsources import (
sync_sources,
generate_source_definitions_yaml,
)
from dbt_automation.utils.warehouseclient import get_client
from dbt_automation.utils.dbtproject import dbtProject
from dbt_automation.utils.dbtsources import read_sources

from ddpui.schemas.dbt_workflow_schema import CompleteDbtModelPayload
from ddpui.models.org import Org, OrgDbt, OrgWarehouse
from ddpui.models.dbt_workflow import OrgDbtModel, OrgDbtOperation
from ddpui.utils.custom_logger import CustomLogger
from ddpui.utils import secretsmanager
from ddpui.celery import app
from ddpui.utils.taskprogress import TaskProgress

OPERATIONS_DICT = {
"flatten": flatten_operation,
Expand Down Expand Up @@ -67,6 +73,9 @@
}


logger = CustomLogger("ddpui")


def _get_wclient(org_warehouse: OrgWarehouse):
"""Connect to a warehouse and return the client"""
credentials = secretsmanager.retrieve_warehouse_credentials(org_warehouse)
Expand Down Expand Up @@ -144,7 +153,7 @@ def create_dbt_model_in_project(
return model_sql_path, output_cols


def sync_sources_to_dbt(
def sync_sources_in_schema(
schema_name: str, source_name: str, org: Org, org_warehouse: OrgWarehouse
):
"""
Expand Down Expand Up @@ -194,3 +203,105 @@ def delete_dbt_model_in_project(orgdbt_model: OrgDbtModel):
dbt_project = dbtProject(Path(orgdbt_model.orgdbt.project_dir) / "dbtrepo")
dbt_project.delete_model(orgdbt_model.sql_path)
return True


@app.task(bind=True)
def sync_sources_for_warehouse(self, org_dbt_id: str, org_warehouse_id: str):
"""
Sync all tables in all schemas in the warehouse.
Dbt source name will be the same as the schema name.
"""
org_dbt: OrgDbt = OrgDbt.objects.filter(id=org_dbt_id).first()
org_warehouse: OrgWarehouse = OrgWarehouse.objects.filter(
id=org_warehouse_id
).first()

taskprogress = TaskProgress(self.request.id)

taskprogress.add(
{
"message": "started syncing sources",
"status": "running",
}
)

dbt_project = dbtProject(Path(org_dbt.project_dir) / "dbtrepo")
wclient = _get_wclient(org_warehouse)

for schema in wclient.get_schemas():
taskprogress.add(
{
"message": f"reading sources for schema {schema} from warehouse",
"status": "running",
}
)
logger.info(f"reading sources for schema {schema} for warehouse")
sync_tables = []
for table in wclient.get_tables(schema):
if not OrgDbtModel.objects.filter(
orgdbt=org_dbt, schema=schema, name=table, type="model"
).first():
sync_tables.append(table)

taskprogress.add(
{
"message": f"Finished reading sources for schema {schema}",
"status": "running",
}
)

if len(sync_tables) == 0:
logger.info(f"No new tables in schema '{schema}' to be synced as sources.")
continue

# in dbt automation, it will overwrite the sources (if name is same which it will be = "schema") and the file
source_yml_path = generate_source_definitions_yaml(
schema, schema, sync_tables, dbt_project
)

logger.info(
f"Generated yaml for {len(sync_tables)} tables for schema '{schema}' as sources; yaml at {source_yml_path}"
)

# sync sources to django db; create if not present
# its okay if we have dnagling sources that they deleted from their warehouse but are still in our db;
# we can clear them up or give them an option to delete
# because deleting the dnagling sources might delete their workflow nodes & edges. They should see a warning for this on the UI
logger.info("synced sources in dbt, saving to db now")
sources = read_dbt_sources_in_project(org_dbt)
logger.info("read fresh source from all yaml files")
taskprogress.add(
{
"message": f"Started syncing sources",
"status": "running",
}
)
for source in sources:
orgdbt_source = OrgDbtModel.objects.filter(
source_name=source["source_name"], name=source["input_name"], type="source"
).first()
if not orgdbt_source:
orgdbt_source = OrgDbtModel.objects.create(
uuid=uuid.uuid4(),
orgdbt=org_dbt,
source_name=source["source_name"],
name=source["input_name"],
display_name=source["input_name"],
type="source",
)

orgdbt_source.schema = source["schema"]
orgdbt_source.sql_path = source["sql_path"]

orgdbt_source.save()

taskprogress.add(
{
"message": f"Sync finished",
"status": "running",
}
)

logger.info("saved sources to db")

return True
27 changes: 27 additions & 0 deletions ddpui/migrations/0061_alter_orgdbtmodel_display_name_and_more.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Generated by Django 4.1.7 on 2024-03-07 07:13

from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
("ddpui", "0060_merge_20240302_1014"),
]

operations = [
migrations.AlterField(
model_name="orgdbtmodel",
name="display_name",
field=models.CharField(max_length=300, null=True),
),
migrations.AlterField(
model_name="orgdbtmodel",
name="name",
field=models.CharField(max_length=300, null=True),
),
migrations.AlterField(
model_name="orgdbtmodel",
name="sql_path",
field=models.CharField(max_length=300, null=True),
),
]
6 changes: 3 additions & 3 deletions ddpui/models/dbt_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ class OrgDbtModel(models.Model):

orgdbt = models.ForeignKey(OrgDbt, on_delete=models.CASCADE)
uuid = models.UUIDField(editable=False, unique=True, null=True)
name = models.CharField(max_length=100, null=True)
display_name = models.CharField(max_length=100, null=True)
name = models.CharField(max_length=300, null=True)
display_name = models.CharField(max_length=300, null=True)
schema = models.CharField(max_length=100, null=True)
sql_path = models.CharField(max_length=200, null=True)
sql_path = models.CharField(max_length=300, null=True)
type = models.CharField(
choices=OrgDbtModelType.choices(), max_length=50, default="model"
)
Expand Down
4 changes: 2 additions & 2 deletions ddpui/schemas/dbt_workflow_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,5 @@ class SyncSourcesSchema(Schema):
schema to sync sources from the schema
"""

schema_name: str
source_name: str
schema_name: str = None
source_name: str = None

0 comments on commit 41a9774

Please sign in to comment.