Skip to content

Commit

Permalink
Draft: #756 - implement python workflow submissions (#762)
Browse files Browse the repository at this point in the history
Signed-off-by: Kyle Valade <kylevalade@rivian.com>
Co-authored-by: Kyle Valade <kylevalade@rivian.com>
Co-authored-by: Ben Cassell <ben.cassell@databricks.com>
  • Loading branch information
3 people authored Oct 10, 2024
1 parent 74c7862 commit 0e821b0
Show file tree
Hide file tree
Showing 9 changed files with 735 additions and 16 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
- Allow for the use of custom constraints, using the `custom` constraint type with an `expression` as the constraint (thanks @roydobbe). ([792](https://github.com/databricks/dbt-databricks/pull/792))
- Add "use_info_schema_for_columns" behavior flag to turn on use of information_schema to get column info where possible. This may have more latency but will not truncate complex data types the way that 'describe' can. ([808](https://github.com/databricks/dbt-databricks/pull/808))
- Add support for table_format: iceberg. This uses UniForm under the hood to provide iceberg compatibility for tables or incrementals. ([815](https://github.com/databricks/dbt-databricks/pull/815))
- Add a new `workflow_job` submission method for python, which creates a long-lived Databricks Workflow instead of a one-time run (thanks @kdazzle!) ([762](https://github.com/databricks/dbt-databricks/pull/762))
- Allow for additional options to be passed to the Databricks Job API when using other python submission methods. For example, enable email_notifications (thanks @kdazzle!) ([762](https://github.com/databricks/dbt-databricks/pull/762))

### Under the Hood

Expand Down
135 changes: 123 additions & 12 deletions dbt/adapters/databricks/api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
from abc import ABC
from abc import abstractmethod
from dataclasses import dataclass
import re
from typing import Any
from typing import Callable
from typing import Dict
from typing import List
from typing import Optional
from typing import Set

Expand Down Expand Up @@ -41,6 +43,11 @@ def post(
) -> Response:
return self.session.post(f"{self.prefix}{suffix}", json=json, params=params)

def put(
self, suffix: str = "", json: Optional[Any] = None, params: Optional[Dict[str, Any]] = None
) -> Response:
return self.session.put(f"{self.prefix}{suffix}", json=json, params=params)


class DatabricksApi(ABC):
def __init__(self, session: Session, host: str, api: str):
Expand Down Expand Up @@ -142,20 +149,38 @@ def get_folder(self, _: str, schema: str) -> str:
return f"/Shared/dbt_python_models/{schema}/"


# Switch to this as part of 2.0.0 release
class UserFolderApi(DatabricksApi, FolderApi):
class CurrUserApi(DatabricksApi):

def __init__(self, session: Session, host: str):
super().__init__(session, host, "/api/2.0/preview/scim/v2")
self._user = ""

def get_folder(self, catalog: str, schema: str) -> str:
if not self._user:
response = self.session.get("/Me")
def get_username(self) -> str:
if self._user:
return self._user

if response.status_code != 200:
raise DbtRuntimeError(f"Error getting user folder.\n {response.content!r}")
self._user = response.json()["userName"]
folder = f"/Users/{self._user}/dbt_python_models/{catalog}/{schema}/"
response = self.session.get("/Me")
if response.status_code != 200:
raise DbtRuntimeError(f"Error getting current user.\n {response.content!r}")

username = response.json()["userName"]
self._user = username
return username

def is_service_principal(self, username: str) -> bool:
uuid_pattern = r"^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$"
return bool(re.match(uuid_pattern, username, re.IGNORECASE))


# Switch to this as part of 2.0.0 release
class UserFolderApi(DatabricksApi, FolderApi):
def __init__(self, session: Session, host: str, user_api: CurrUserApi):
super().__init__(session, host, "/api/2.0/preview/scim/v2")
self.user_api = user_api

def get_folder(self, catalog: str, schema: str) -> str:
username = self.user_api.get_username()
folder = f"/Users/{username}/dbt_python_models/{catalog}/{schema}/"
logger.debug(f"Using python model folder '{folder}'")

return folder
Expand Down Expand Up @@ -302,9 +327,11 @@ class JobRunsApi(PollableApi):
def __init__(self, session: Session, host: str, polling_interval: int, timeout: int):
super().__init__(session, host, "/api/2.1/jobs/runs", polling_interval, timeout)

def submit(self, run_name: str, job_spec: Dict[str, Any]) -> str:
def submit(
self, run_name: str, job_spec: Dict[str, Any], **additional_job_settings: Dict[str, Any]
) -> str:
submit_response = self.session.post(
"/submit", json={"run_name": run_name, "tasks": [job_spec]}
"/submit", json={"run_name": run_name, "tasks": [job_spec], **additional_job_settings}
)
if submit_response.status_code != 200:
raise DbtRuntimeError(f"Error creating python run.\n {submit_response.content!r}")
Expand Down Expand Up @@ -357,6 +384,87 @@ def cancel(self, run_id: str) -> None:
raise DbtRuntimeError(f"Cancel run {run_id} failed.\n {response.content!r}")


class JobPermissionsApi(DatabricksApi):
def __init__(self, session: Session, host: str):
super().__init__(session, host, "/api/2.0/permissions/jobs")

def put(self, job_id: str, access_control_list: List[Dict[str, Any]]) -> None:
request_body = {"access_control_list": access_control_list}

response = self.session.put(f"/{job_id}", json=request_body)
logger.debug(f"Workflow permissions update response={response.json()}")

if response.status_code != 200:
raise DbtRuntimeError(f"Error updating Databricks workflow.\n {response.content!r}")

def get(self, job_id: str) -> Dict[str, Any]:
response = self.session.get(f"/{job_id}")

if response.status_code != 200:
raise DbtRuntimeError(
f"Error fetching Databricks workflow permissions.\n {response.content!r}"
)

return response.json()


class WorkflowJobApi(DatabricksApi):

def __init__(self, session: Session, host: str):
super().__init__(session, host, "/api/2.1/jobs")

def search_by_name(self, job_name: str) -> List[Dict[str, Any]]:
response = self.session.get("/list", json={"name": job_name})

if response.status_code != 200:
raise DbtRuntimeError(f"Error fetching job by name.\n {response.content!r}")

return response.json().get("jobs", [])

def create(self, job_spec: Dict[str, Any]) -> str:
"""
:return: the job_id
"""
response = self.session.post("/create", json=job_spec)

if response.status_code != 200:
raise DbtRuntimeError(f"Error creating Workflow.\n {response.content!r}")

job_id = response.json()["job_id"]
logger.info(f"New workflow created with job id {job_id}")
return job_id

def update_job_settings(self, job_id: str, job_spec: Dict[str, Any]) -> None:
request_body = {
"job_id": job_id,
"new_settings": job_spec,
}
logger.debug(f"Job settings: {request_body}")
response = self.session.post("/reset", json=request_body)

if response.status_code != 200:
raise DbtRuntimeError(f"Error updating Workflow.\n {response.content!r}")

logger.debug(f"Workflow update response={response.json()}")

def run(self, job_id: str, enable_queueing: bool = True) -> str:
request_body = {
"job_id": job_id,
"queue": {
"enabled": enable_queueing,
},
}
response = self.session.post("/run-now", json=request_body)

if response.status_code != 200:
raise DbtRuntimeError(f"Error triggering run for workflow.\n {response.content!r}")

response_json = response.json()
logger.info(f"Workflow trigger response={response_json}")

return response_json["run_id"]


class DatabricksApiClient:
def __init__(
self,
Expand All @@ -368,13 +476,16 @@ def __init__(
):
self.clusters = ClusterApi(session, host)
self.command_contexts = CommandContextApi(session, host, self.clusters)
self.curr_user = CurrUserApi(session, host)
if use_user_folder:
self.folders: FolderApi = UserFolderApi(session, host)
self.folders: FolderApi = UserFolderApi(session, host, self.curr_user)
else:
self.folders = SharedFolderApi()
self.workspace = WorkspaceApi(session, host, self.folders)
self.commands = CommandApi(session, host, polling_interval, timeout)
self.job_runs = JobRunsApi(session, host, polling_interval, timeout)
self.workflows = WorkflowJobApi(session, host)
self.workflow_permissions = JobPermissionsApi(session, host)

@staticmethod
def create(
Expand Down
4 changes: 4 additions & 0 deletions dbt/adapters/databricks/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@
from dbt.adapters.databricks.python_models.python_submissions import (
ServerlessClusterPythonJobHelper,
)
from dbt.adapters.databricks.python_models.python_submissions import (
WorkflowPythonJobHelper,
)
from dbt.adapters.databricks.relation import DatabricksRelation
from dbt.adapters.databricks.relation import DatabricksRelationType
from dbt.adapters.databricks.relation import KEY_TABLE_PROVIDER
Expand Down Expand Up @@ -635,6 +638,7 @@ def python_submission_helpers(self) -> Dict[str, Type[PythonJobHelper]]:
"job_cluster": JobClusterPythonJobHelper,
"all_purpose_cluster": AllPurposeClusterPythonJobHelper,
"serverless_cluster": ServerlessClusterPythonJobHelper,
"workflow_job": WorkflowPythonJobHelper,
}

@available
Expand Down
Loading

0 comments on commit 0e821b0

Please sign in to comment.