Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

495 update sync sources api #497

Merged
merged 5 commits into from
Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 6 additions & 33 deletions ddpui/api/transform_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@
from ddpui.schemas.org_task_schema import DbtProjectSchema
from ddpui.schemas.dbt_workflow_schema import (
CreateDbtModelPayload,
SyncSourcesSchema,
CompleteDbtModelPayload,
)

from ddpui.core import dbtautomation_service
from ddpui.core.dbtautomation_service import sync_sources_for_warehouse

transformapi = NinjaAPI(urls_namespace="transform")

Expand Down Expand Up @@ -127,7 +127,7 @@ def delete_dbt_project(request, project_name: str):


@transformapi.post("/dbt_project/sync_sources/", auth=auth.CanManagePipelines())
def sync_sources(request, payload: SyncSourcesSchema):
def sync_sources(request):
"""
Sync sources from a given schema.
"""
Expand All @@ -142,37 +142,9 @@ def sync_sources(request, payload: SyncSourcesSchema):
if not orgdbt:
raise HttpError(404, "DBT workspace not set up")

sources_file_path, error = dbtautomation_service.sync_sources_to_dbt(
payload.schema_name, payload.source_name, org, org_warehouse
)
task = sync_sources_for_warehouse.delay(orgdbt.id, org_warehouse.id)

if error:
raise HttpError(422, error)

# sync sources to django db
logger.info("synced sources in dbt, saving to db now")
sources = dbtautomation_service.read_dbt_sources_in_project(orgdbt)
for source in sources:
orgdbt_source = OrgDbtModel.objects.filter(
source_name=source["source_name"], name=source["input_name"], type="source"
).first()

if not orgdbt_source:
orgdbt_source = OrgDbtModel(
uuid=uuid.uuid4(),
orgdbt=orgdbt,
source_name=source["source_name"],
name=source["input_name"],
display_name=source["input_name"],
type="source",
)

orgdbt_source.schema = source["schema"]
orgdbt_source.sql_path = sources_file_path

orgdbt_source.save()

return {"sources_file_path": str(sources_file_path)}
return {"task_id": task.id}


########################## Models & Sources #############################################
Expand Down Expand Up @@ -460,6 +432,7 @@ def get_dbt_project_DAG(request):
"output_cols": node.output_cols,
"config": node.config,
"type": "operation_node",
"target_model_id": node.dbtmodel.uuid,
}
)

Expand Down Expand Up @@ -495,7 +468,7 @@ def delete_model(request, model_uuid):
orgdbt_model = OrgDbtModel.objects.filter(uuid=model_uuid).first()
if not orgdbt_model:
raise HttpError(404, "model not found")

operations = OrgDbtOperation.objects.filter(dbtmodel=orgdbt_model).count()

if operations > 0:
Expand Down
117 changes: 114 additions & 3 deletions ddpui/core/dbtautomation_service.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import os
import os, uuid
from pathlib import Path

from dbt_automation.operations.arithmetic import arithmetic, arithmetic_dbt_sql
Expand Down Expand Up @@ -31,15 +31,21 @@
merge_operations,
merge_operations_sql,
)
from dbt_automation.operations.syncsources import sync_sources
from dbt_automation.operations.syncsources import (
sync_sources,
generate_source_definitions_yaml,
)
from dbt_automation.utils.warehouseclient import get_client
from dbt_automation.utils.dbtproject import dbtProject
from dbt_automation.utils.dbtsources import read_sources

from ddpui.schemas.dbt_workflow_schema import CompleteDbtModelPayload
from ddpui.models.org import Org, OrgDbt, OrgWarehouse
from ddpui.models.dbt_workflow import OrgDbtModel, OrgDbtOperation
from ddpui.utils.custom_logger import CustomLogger
from ddpui.utils import secretsmanager
from ddpui.celery import app
from ddpui.utils.taskprogress import TaskProgress

OPERATIONS_DICT = {
"flatten": flatten_operation,
Expand Down Expand Up @@ -67,6 +73,9 @@
}


logger = CustomLogger("ddpui")


def _get_wclient(org_warehouse: OrgWarehouse):
"""Connect to a warehouse and return the client"""
credentials = secretsmanager.retrieve_warehouse_credentials(org_warehouse)
Expand Down Expand Up @@ -144,7 +153,7 @@ def create_dbt_model_in_project(
return model_sql_path, output_cols


def sync_sources_to_dbt(
def sync_sources_in_schema(
schema_name: str, source_name: str, org: Org, org_warehouse: OrgWarehouse
):
"""
Expand Down Expand Up @@ -194,3 +203,105 @@ def delete_dbt_model_in_project(orgdbt_model: OrgDbtModel):
dbt_project = dbtProject(Path(orgdbt_model.orgdbt.project_dir) / "dbtrepo")
dbt_project.delete_model(orgdbt_model.sql_path)
return True


@app.task(bind=True)
def sync_sources_for_warehouse(self, org_dbt_id: str, org_warehouse_id: str):
"""
Sync all tables in all schemas in the warehouse.
Dbt source name will be the same as the schema name.
"""
org_dbt: OrgDbt = OrgDbt.objects.filter(id=org_dbt_id).first()
org_warehouse: OrgWarehouse = OrgWarehouse.objects.filter(
id=org_warehouse_id
).first()

taskprogress = TaskProgress(self.request.id)

taskprogress.add(
{
"message": "started syncing sources",
"status": "running",
}
)

dbt_project = dbtProject(Path(org_dbt.project_dir) / "dbtrepo")
wclient = _get_wclient(org_warehouse)

for schema in wclient.get_schemas():
taskprogress.add(
{
"message": f"reading sources for schema {schema} from warehouse",
"status": "running",
}
)
logger.info(f"reading sources for schema {schema} for warehouse")
sync_tables = []
for table in wclient.get_tables(schema):
if not OrgDbtModel.objects.filter(
orgdbt=org_dbt, schema=schema, name=table, type="model"
).first():
sync_tables.append(table)

taskprogress.add(
{
"message": f"Finished reading sources for schema {schema}",
"status": "running",
}
)

if len(sync_tables) == 0:
logger.info(f"No new tables in schema '{schema}' to be synced as sources.")
continue

# in dbt automation, it will overwrite the sources (if name is same which it will be = "schema") and the file
source_yml_path = generate_source_definitions_yaml(
schema, schema, sync_tables, dbt_project
)

logger.info(
f"Generated yaml for {len(sync_tables)} tables for schema '{schema}' as sources; yaml at {source_yml_path}"
)

# sync sources to django db; create if not present
# its okay if we have dnagling sources that they deleted from their warehouse but are still in our db;
# we can clear them up or give them an option to delete
# because deleting the dnagling sources might delete their workflow nodes & edges. They should see a warning for this on the UI
logger.info("synced sources in dbt, saving to db now")
sources = read_dbt_sources_in_project(org_dbt)
logger.info("read fresh source from all yaml files")
taskprogress.add(
{
"message": f"Started syncing sources",
"status": "running",
}
)
for source in sources:
orgdbt_source = OrgDbtModel.objects.filter(
source_name=source["source_name"], name=source["input_name"], type="source"
).first()
if not orgdbt_source:
orgdbt_source = OrgDbtModel.objects.create(
uuid=uuid.uuid4(),
orgdbt=org_dbt,
source_name=source["source_name"],
name=source["input_name"],
display_name=source["input_name"],
type="source",
)

orgdbt_source.schema = source["schema"]
orgdbt_source.sql_path = source["sql_path"]

orgdbt_source.save()

taskprogress.add(
{
"message": f"Sync finished",
"status": "running",
}
)

logger.info("saved sources to db")

return True
27 changes: 27 additions & 0 deletions ddpui/migrations/0061_alter_orgdbtmodel_display_name_and_more.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Generated by Django 4.1.7 on 2024-03-07 07:13

from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
("ddpui", "0060_merge_20240302_1014"),
]

operations = [
migrations.AlterField(
model_name="orgdbtmodel",
name="display_name",
field=models.CharField(max_length=300, null=True),
),
migrations.AlterField(
model_name="orgdbtmodel",
name="name",
field=models.CharField(max_length=300, null=True),
),
migrations.AlterField(
model_name="orgdbtmodel",
name="sql_path",
field=models.CharField(max_length=300, null=True),
),
]
6 changes: 3 additions & 3 deletions ddpui/models/dbt_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ class OrgDbtModel(models.Model):

orgdbt = models.ForeignKey(OrgDbt, on_delete=models.CASCADE)
uuid = models.UUIDField(editable=False, unique=True, null=True)
name = models.CharField(max_length=100, null=True)
display_name = models.CharField(max_length=100, null=True)
name = models.CharField(max_length=300, null=True)
display_name = models.CharField(max_length=300, null=True)
schema = models.CharField(max_length=100, null=True)
sql_path = models.CharField(max_length=200, null=True)
sql_path = models.CharField(max_length=300, null=True)
type = models.CharField(
choices=OrgDbtModelType.choices(), max_length=50, default="model"
)
Expand Down
4 changes: 2 additions & 2 deletions ddpui/schemas/dbt_workflow_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,5 @@ class SyncSourcesSchema(Schema):
schema to sync sources from the schema
"""

schema_name: str
source_name: str
schema_name: str = None
source_name: str = None
Loading