diff --git a/ddpui/api/transform_api.py b/ddpui/api/transform_api.py index 9d1c909b..b958e6b2 100644 --- a/ddpui/api/transform_api.py +++ b/ddpui/api/transform_api.py @@ -25,6 +25,7 @@ ) from ddpui.core import dbtautomation_service +from ddpui.core.dbtautomation_service import sync_sources_for_warehouse transformapi = NinjaAPI(urls_namespace="transform") @@ -141,9 +142,9 @@ def sync_sources(request): if not orgdbt: raise HttpError(404, "DBT workspace not set up") - dbtautomation_service.sync_sources_for_warehouse(orgdbt, org_warehouse) + task = sync_sources_for_warehouse.delay(orgdbt.id, org_warehouse.id) - return {"success": 1} + return {"task_id": task.id} ########################## Models & Sources ############################################# diff --git a/ddpui/core/dbtautomation_service.py b/ddpui/core/dbtautomation_service.py index 282f1a0b..530a7c6c 100644 --- a/ddpui/core/dbtautomation_service.py +++ b/ddpui/core/dbtautomation_service.py @@ -44,6 +44,8 @@ from ddpui.models.dbt_workflow import OrgDbtModel, OrgDbtOperation from ddpui.utils.custom_logger import CustomLogger from ddpui.utils import secretsmanager +from ddpui.celery import app +from ddpui.utils.taskprogress import TaskProgress OPERATIONS_DICT = { "flatten": flatten_operation, @@ -203,16 +205,37 @@ def delete_dbt_model_in_project(orgdbt_model: OrgDbtModel): return True -def sync_sources_for_warehouse(org_dbt: OrgDbt, org_warehouse: OrgWarehouse): +@app.task(bind=True) +def sync_sources_for_warehouse(self, org_dbt_id: str, org_warehouse_id: str): """ Sync all tables in all schemas in the warehouse. Dbt source name will be the same as the schema name. """ + org_dbt: OrgDbt = OrgDbt.objects.filter(id=org_dbt_id).first() + org_warehouse: OrgWarehouse = OrgWarehouse.objects.filter( + id=org_warehouse_id + ).first() + + taskprogress = TaskProgress(self.request.id) + + taskprogress.add( + { + "message": "started syncing sources", + "status": "running", + } + ) + dbt_project = dbtProject(Path(org_dbt.project_dir) / "dbtrepo") wclient = _get_wclient(org_warehouse) for schema in wclient.get_schemas(): - logger.info(f"syncing sources for schema {schema}") + taskprogress.add( + { + "message": f"reading sources for schema {schema} from warehouse", + "status": "running", + } + ) + logger.info(f"reading sources for schema {schema} for warehouse") sync_tables = [] for table in wclient.get_tables(schema): if not OrgDbtModel.objects.filter( @@ -220,6 +243,13 @@ def sync_sources_for_warehouse(org_dbt: OrgDbt, org_warehouse: OrgWarehouse): ).first(): sync_tables.append(table) + taskprogress.add( + { + "message": f"Finished reading sources for schema {schema}", + "status": "running", + } + ) + if len(sync_tables) == 0: logger.info(f"No new tables in schema '{schema}' to be synced as sources.") continue @@ -240,6 +270,12 @@ def sync_sources_for_warehouse(org_dbt: OrgDbt, org_warehouse: OrgWarehouse): logger.info("synced sources in dbt, saving to db now") sources = read_dbt_sources_in_project(org_dbt) logger.info("read fresh source from all yaml files") + taskprogress.add( + { + "message": f"Started syncing sources", + "status": "running", + } + ) for source in sources: orgdbt_source = OrgDbtModel.objects.filter( source_name=source["source_name"], name=source["input_name"], type="source" @@ -259,6 +295,13 @@ def sync_sources_for_warehouse(org_dbt: OrgDbt, org_warehouse: OrgWarehouse): orgdbt_source.save() + taskprogress.add( + { + "message": f"Sync finished", + "status": "running", + } + ) + logger.info("saved sources to db") return True