-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add uc volumes support #82
Changes from 3 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,4 +9,5 @@ poetry | |
mypy | ||
SecretStorage | ||
readme-renderer | ||
twine | ||
twine | ||
databricks-sdk==0.33.0 |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,4 @@ | ||
import os | ||
import glob | ||
from typing import Optional, List, Union | ||
|
||
import fire | ||
|
@@ -54,49 +53,53 @@ def launch( | |
project_location: str = ".", | ||
dbfs_path: Optional[str] = None, | ||
watch: bool = True, | ||
glob_path: Optional[Union[str, List[str]]] = None | ||
glob_path: Optional[Union[str, List[str]]] = None, | ||
use_volumes: Optional[bool] = False, | ||
dst_path: Optional[str] = None, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe we deprecate There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added a warning there so people can still use it now but we can deprecate it in the next version. |
||
) -> None: | ||
""" | ||
Entrypoint of the application, triggers a build and deploy | ||
:param project_location: path to project code, default: `"."` | ||
:param dbfs_path: path where the wheel will be stored, ex: dbfs:/tmp/myteam/myproject | ||
:param dbfs_path: path where the wheel will be stored, ex: dbfs:/tmp/myteam/myproject. Only support dbfs path. | ||
:param watch: Set to false if you don't want to automatically sync your files | ||
:param glob_path: glob string or list of strings for additional files to deploy, e.g. "*.json" | ||
:param use_volumes: upload files to unity catalog volumes. | ||
:param dst_path: Destination path to store the files. Support both dbfs:/ and /Volumes. Ideally, we should use dst_path and deprecate dbfs_path. | ||
:return: | ||
""" | ||
if os.getenv("DATABRICKS_TOKEN") is None: | ||
raise Exception("DATABRICKS_TOKEN must be set for db-rocket to work") | ||
|
||
if dbfs_path is not None and not dbfs_path.startswith("dbfs:/"): | ||
raise Exception("`dbfs_path` must start with dbfs:/") | ||
|
||
try: | ||
execute_shell_command(f"databricks fs ls dbfs:/") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we add here some validation if the databricks workspace API works? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point, I've added some :) |
||
except Exception as e: | ||
raise Exception( | ||
f"Error accessing DBFS via databricks-cli. Please check if your databricks token is set and valid? Try to generate a new token and update existing one with `databricks configure --token`. Error details: {e}" | ||
) | ||
|
||
if not dbfs_path: | ||
dbfs_path = f"dbfs:/temp/{os.environ['USER']}" | ||
if use_volumes: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe something like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we did that, we'd need to choose some default behavior in case no path is provided (since
And so you wouldn't be able to upload to dbfs without providing some path, which would be backwards incompatible. |
||
db_path = self.get_volumes_path(dst_path) | ||
else: | ||
path_to_use = dst_path if dst_path else dbfs_path | ||
db_path = self.get_dbfs_path(path_to_use) | ||
|
||
if watch: | ||
project_name = os.path.abspath(project_location).split("/")[-1] | ||
dbfs_path = f"{dbfs_path}/{project_name}" | ||
db_path = f"{db_path}/{project_name}" | ||
|
||
glob_paths = [] | ||
if isinstance(glob_path, str): | ||
glob_paths = [os.path.join(project_location, glob_path)] | ||
elif isinstance(glob_path, list): | ||
glob_paths = [os.path.join(project_location, path) for path in glob_path] | ||
|
||
self._build_and_deploy(watch=watch, project_location=project_location, dbfs_path=dbfs_path, glob_paths=glob_paths) | ||
self._build_and_deploy(watch=watch, project_location=project_location, db_path=db_path, glob_paths=glob_paths) | ||
if watch: | ||
watcher = FileWatcher( | ||
project_location, | ||
lambda x: self._build_and_deploy( | ||
watch=watch, | ||
modified_files=watcher.modified_files, | ||
dbfs_path=dbfs_path, | ||
db_path=db_path, | ||
project_location=project_location, | ||
glob_paths=glob_path | ||
), | ||
|
@@ -108,15 +111,15 @@ def _build_and_deploy( | |
self, | ||
watch: bool, | ||
project_location: str, | ||
dbfs_path: str, | ||
db_path: str, | ||
modified_files: Optional[List[str]] = None, | ||
glob_paths: Optional[List[str]] = None | ||
) -> None: | ||
if modified_files: | ||
logger.info(f"Found changes in {modified_files}. Overwriting them.") | ||
self._deploy( | ||
file_paths=modified_files, | ||
dbfs_path=dbfs_path, | ||
db_path=db_path, | ||
project_location=project_location, | ||
) | ||
return | ||
|
@@ -128,10 +131,10 @@ def _build_and_deploy( | |
wheel_path, wheel_file = self._create_python_project_wheel(project_location) | ||
self._deploy( | ||
file_paths=[wheel_path], | ||
dbfs_path=dbfs_path, | ||
db_path=db_path, | ||
project_location=os.path.dirname(wheel_path), | ||
) | ||
install_path = f'{dbfs_path.replace("dbfs:/", "/dbfs/")}/{wheel_file}' | ||
install_path = f"{self.get_install_path(db_path)}/{wheel_file}" | ||
|
||
dependency_files = ["requirements.in", "requirements.txt"] | ||
index_urls = [] | ||
|
@@ -183,10 +186,10 @@ def _build_and_deploy( | |
line.strip() for line in f.readlines() if "index-url" in line | ||
] | ||
self._deploy( | ||
file_paths=list(files), dbfs_path=dbfs_path, project_location=project_location | ||
file_paths=list(files), db_path=db_path, project_location=project_location | ||
) | ||
|
||
install_path = f'{dbfs_path.replace("dbfs:/", "/dbfs/")}' | ||
install_path = self.get_install_path(db_path) | ||
index_urls_options = " ".join(index_urls) | ||
|
||
if dependency_file_exist: | ||
|
@@ -215,16 +218,55 @@ def _build_and_deploy( | |
def _deploy( | ||
self, | ||
file_paths: List[str], | ||
dbfs_path: str, | ||
db_path: str, | ||
project_location: str | ||
) -> None: | ||
if self.is_dbfs(db_path): | ||
self._deploy_dbfs(file_paths, db_path, project_location) | ||
else: | ||
from databricks.sdk import WorkspaceClient | ||
w = WorkspaceClient() | ||
self._deploy_volumes(file_paths, db_path, project_location, w) | ||
|
||
def _deploy_dbfs( | ||
self, | ||
file_paths: List[str], | ||
db_path: str, | ||
project_location: str | ||
): | ||
def helper(file: str) -> None: | ||
target_path = f"{dbfs_path}/{os.path.relpath(file, project_location)}" | ||
target_path = f"{db_path}/{os.path.relpath(file, project_location)}" | ||
execute_shell_command(f"databricks fs cp --recursive --overwrite {file} {target_path}") | ||
logger.info(f"Uploaded {file} to {target_path}") | ||
|
||
execute_for_each_multithreaded(file_paths, lambda x: helper(x)) | ||
|
||
def _deploy_volumes( | ||
self, | ||
file_paths: List[str], | ||
db_path: str, | ||
project_location: str, | ||
workspace_client | ||
): | ||
def helper(wc, file: str) -> None: | ||
# sdk asks an absolute path | ||
if not os.path.isabs(file): | ||
cwd = os.getcwd() | ||
file = f"{cwd}/{file}" | ||
target_path = f"{db_path}/{os.path.relpath(file, project_location)}" | ||
# if the file already exists, sdk returns error message: The file being created already exists. | ||
# a feature request is already here: https://github.com/databricks/databricks-sdk-py/issues/548 | ||
try: | ||
wc.dbutils.fs.rm(target_path) | ||
except Exception: | ||
pass | ||
# sdk uses urllibs3 to parse paths. | ||
# It need to be file:// to be recognized as a local file. Otherwise it raises file not exist error | ||
wc.dbutils.fs.cp(f"file://{file}", target_path) | ||
logger.info(f"Uploaded {file} to {target_path}") | ||
|
||
execute_for_each_multithreaded(file_paths, lambda x: helper(workspace_client, x)) | ||
|
||
def _create_python_project_wheel(self, project_location: str) -> (str, str): | ||
dist_location = f"{project_location}/dist" | ||
execute_shell_command(f"rm {dist_location}/* 2>/dev/null || true") | ||
|
@@ -250,6 +292,24 @@ def _create_python_project_wheel(self, project_location: str) -> (str, str): | |
wheel_path = f"{dist_location}/{wheel_file}" | ||
return wheel_path, wheel_file | ||
|
||
def get_dbfs_path(self, path: Optional[str]) -> str: | ||
if path and not self.is_dbfs(path): | ||
raise Exception("`dbfs_path` must start with dbfs:/") | ||
return path or f"dbfs:/temp/{os.environ['USER']}" | ||
|
||
def get_volumes_path(self, path: Optional[str]) -> str: | ||
if path and not path.startswith("/Volumes"): | ||
raise Exception("`use_volumes` is true. `dst_path` must start with /Volumes") | ||
return path or f"/Volumes/main/data_products/volume/db_rocket/{os.environ['USER']}" | ||
|
||
def get_install_path(self, db_path): | ||
if self.is_dbfs(db_path): | ||
return f'{db_path.replace("dbfs:/", "/dbfs/")}' | ||
return db_path | ||
|
||
def is_dbfs(self, db_path: str): | ||
return db_path.startswith("dbfs:/") | ||
|
||
|
||
def main(): | ||
fire.Fire(Rocket) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,15 +9,15 @@ | |
|
||
setuptools.setup( | ||
name="databricks-rocket", | ||
version="2.1.0", | ||
version="2.2.0", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's set it as a major version update There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. so bump it to 3.0.0? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's backwards compatible, so I would think it should be a minor version update. |
||
author="GetYourGuide", | ||
author_email="engineering.data-products@getyourguide.com", | ||
description="Keep your local python scripts installed and in sync with a databricks notebook. Shortens the feedback loop to develop projects using a hybrid enviroment", | ||
long_description=readme, | ||
long_description_content_type="text/markdown", | ||
url="https://github.com/getyourguide/db-rocket", | ||
packages=setuptools.find_packages(), | ||
install_requires=["fire", "watchdog~=2.1.9", "build", "databricks_cli"], | ||
install_requires=["fire", "watchdog~=2.1.9", "build", "databricks_cli", "databricks-sdk==0.33.0"], | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need to pin the version? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. as you mentioned, this sdk is still in beta, and when integrating it, I looked into the project, which seems to be a lot of things under development, so pin the version can guarantee things don't break. |
||
entry_points={ | ||
"console_scripts": ["rocket=rocket.rocket:main", "dbrocket=rocket.rocket:main"] | ||
}, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean
use_volumes
? we don't want to change the current behaviour but only add an option for using volumes.