Skip to content

Commit

Permalink
setting up sync sources as a celery task
Browse files Browse the repository at this point in the history
  • Loading branch information
Ishankoradia committed Mar 7, 2024
1 parent c05680a commit c9e0d7a
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 4 deletions.
5 changes: 3 additions & 2 deletions ddpui/api/transform_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
)

from ddpui.core import dbtautomation_service
from ddpui.core.dbtautomation_service import sync_sources_for_warehouse

transformapi = NinjaAPI(urls_namespace="transform")

Expand Down Expand Up @@ -141,9 +142,9 @@ def sync_sources(request):
if not orgdbt:
raise HttpError(404, "DBT workspace not set up")

dbtautomation_service.sync_sources_for_warehouse(orgdbt, org_warehouse)
task = sync_sources_for_warehouse.delay(orgdbt.id, org_warehouse.id)

return {"success": 1}
return {"task_id": task.id}


########################## Models & Sources #############################################
Expand Down
47 changes: 45 additions & 2 deletions ddpui/core/dbtautomation_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
from ddpui.models.dbt_workflow import OrgDbtModel, OrgDbtOperation
from ddpui.utils.custom_logger import CustomLogger
from ddpui.utils import secretsmanager
from ddpui.celery import app
from ddpui.utils.taskprogress import TaskProgress

OPERATIONS_DICT = {
"flatten": flatten_operation,
Expand Down Expand Up @@ -203,23 +205,51 @@ def delete_dbt_model_in_project(orgdbt_model: OrgDbtModel):
return True


def sync_sources_for_warehouse(org_dbt: OrgDbt, org_warehouse: OrgWarehouse):
@app.task(bind=True)
def sync_sources_for_warehouse(self, org_dbt_id: str, org_warehouse_id: str):
"""
Sync all tables in all schemas in the warehouse.
Dbt source name will be the same as the schema name.
"""
org_dbt: OrgDbt = OrgDbt.objects.filter(id=org_dbt_id).first()
org_warehouse: OrgWarehouse = OrgWarehouse.objects.filter(
id=org_warehouse_id
).first()

taskprogress = TaskProgress(self.request.id)

taskprogress.add(
{
"message": "started syncing sources",
"status": "running",
}
)

dbt_project = dbtProject(Path(org_dbt.project_dir) / "dbtrepo")
wclient = _get_wclient(org_warehouse)

for schema in wclient.get_schemas():
logger.info(f"syncing sources for schema {schema}")
taskprogress.add(
{
"message": f"reading sources for schema {schema} from warehouse",
"status": "running",
}
)
logger.info(f"reading sources for schema {schema} for warehouse")
sync_tables = []
for table in wclient.get_tables(schema):
if not OrgDbtModel.objects.filter(
orgdbt=org_dbt, schema=schema, name=table, type="model"
).first():
sync_tables.append(table)

taskprogress.add(
{
"message": f"Finished reading sources for schema {schema}",
"status": "running",
}
)

if len(sync_tables) == 0:
logger.info(f"No new tables in schema '{schema}' to be synced as sources.")
continue
Expand All @@ -240,6 +270,12 @@ def sync_sources_for_warehouse(org_dbt: OrgDbt, org_warehouse: OrgWarehouse):
logger.info("synced sources in dbt, saving to db now")
sources = read_dbt_sources_in_project(org_dbt)
logger.info("read fresh source from all yaml files")
taskprogress.add(
{
"message": f"Started syncing sources",
"status": "running",
}
)
for source in sources:
orgdbt_source = OrgDbtModel.objects.filter(
source_name=source["source_name"], name=source["input_name"], type="source"
Expand All @@ -259,6 +295,13 @@ def sync_sources_for_warehouse(org_dbt: OrgDbt, org_warehouse: OrgWarehouse):

orgdbt_source.save()

taskprogress.add(
{
"message": f"Sync finished",
"status": "running",
}
)

logger.info("saved sources to db")

return True

0 comments on commit c9e0d7a

Please sign in to comment.