-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add uc volumes support #82
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,4 +9,5 @@ poetry | |
mypy | ||
SecretStorage | ||
readme-renderer | ||
twine | ||
twine | ||
databricks-sdk==0.33.0 |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,9 +1,9 @@ | ||
import os | ||
import glob | ||
from typing import Optional, List, Union | ||
|
||
import fire | ||
|
||
from databricks.sdk import WorkspaceClient | ||
from rocket.file_watcher import FileWatcher | ||
from rocket.logger import logger | ||
from rocket.utils import ( | ||
|
@@ -54,49 +54,63 @@ def launch( | |
project_location: str = ".", | ||
dbfs_path: Optional[str] = None, | ||
watch: bool = True, | ||
glob_path: Optional[Union[str, List[str]]] = None | ||
glob_path: Optional[Union[str, List[str]]] = None, | ||
use_volumes: Optional[bool] = False, | ||
dst_path: Optional[str] = None, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe we deprecate There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added a warning there so people can still use it now but we can deprecate it in the next version. |
||
) -> None: | ||
""" | ||
Entrypoint of the application, triggers a build and deploy | ||
:param project_location: path to project code, default: `"."` | ||
:param dbfs_path: path where the wheel will be stored, ex: dbfs:/tmp/myteam/myproject | ||
:param dbfs_path: path where the wheel will be stored, ex: dbfs:/tmp/myteam/myproject. Only support dbfs path. | ||
:param watch: Set to false if you don't want to automatically sync your files | ||
:param glob_path: glob string or list of strings for additional files to deploy, e.g. "*.json" | ||
:param use_volumes: upload files to unity catalog volumes. | ||
:param dst_path: Destination path to store the files. Support both dbfs:/ and /Volumes. Ideally, we should use dst_path and deprecate dbfs_path. | ||
:return: | ||
""" | ||
if os.getenv("DATABRICKS_TOKEN") is None: | ||
raise Exception("DATABRICKS_TOKEN must be set for db-rocket to work") | ||
|
||
if dbfs_path is not None and not dbfs_path.startswith("dbfs:/"): | ||
raise Exception("`dbfs_path` must start with dbfs:/") | ||
|
||
try: | ||
execute_shell_command(f"databricks fs ls dbfs:/") | ||
except Exception as e: | ||
raise Exception( | ||
f"Error accessing DBFS via databricks-cli. Please check if your databricks token is set and valid? Try to generate a new token and update existing one with `databricks configure --token`. Error details: {e}" | ||
) | ||
base_dbfs_access_error_message = ("Please check if your databricks token is set and valid? " | ||
"Try to generate a new token and update existing one with " | ||
"`databricks configure --token`.") | ||
if use_volumes: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe something like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we did that, we'd need to choose some default behavior in case no path is provided (since
And so you wouldn't be able to upload to dbfs without providing some path, which would be backwards incompatible. |
||
try: | ||
workspace_client = WorkspaceClient() | ||
workspace_client.dbutils.fs.ls("dbfs:/") | ||
except Exception as e: | ||
raise Exception( | ||
f"Could not access dbfs using databricks SDK. {base_dbfs_access_error_message} Error details: {e}" | ||
) | ||
db_path = self.get_volumes_path(dst_path) | ||
else: | ||
try: | ||
execute_shell_command(f"databricks fs ls dbfs:/") | ||
except Exception as e: | ||
raise Exception( | ||
f"Error accessing DBFS via databricks-cli. {base_dbfs_access_error_message} Error details: {e}" | ||
) | ||
path_to_use = dst_path if dst_path else dbfs_path | ||
db_path = self.get_dbfs_path(path_to_use) | ||
|
||
if not dbfs_path: | ||
dbfs_path = f"dbfs:/temp/{os.environ['USER']}" | ||
if watch: | ||
project_name = os.path.abspath(project_location).split("/")[-1] | ||
dbfs_path = f"{dbfs_path}/{project_name}" | ||
db_path = f"{db_path}/{project_name}" | ||
|
||
glob_paths = [] | ||
if isinstance(glob_path, str): | ||
glob_paths = [os.path.join(project_location, glob_path)] | ||
elif isinstance(glob_path, list): | ||
glob_paths = [os.path.join(project_location, path) for path in glob_path] | ||
|
||
self._build_and_deploy(watch=watch, project_location=project_location, dbfs_path=dbfs_path, glob_paths=glob_paths) | ||
self._build_and_deploy(watch=watch, project_location=project_location, db_path=db_path, glob_paths=glob_paths) | ||
if watch: | ||
watcher = FileWatcher( | ||
project_location, | ||
lambda x: self._build_and_deploy( | ||
watch=watch, | ||
modified_files=watcher.modified_files, | ||
dbfs_path=dbfs_path, | ||
db_path=db_path, | ||
project_location=project_location, | ||
glob_paths=glob_path | ||
), | ||
|
@@ -108,15 +122,15 @@ def _build_and_deploy( | |
self, | ||
watch: bool, | ||
project_location: str, | ||
dbfs_path: str, | ||
db_path: str, | ||
modified_files: Optional[List[str]] = None, | ||
glob_paths: Optional[List[str]] = None | ||
) -> None: | ||
if modified_files: | ||
logger.info(f"Found changes in {modified_files}. Overwriting them.") | ||
self._deploy( | ||
file_paths=modified_files, | ||
dbfs_path=dbfs_path, | ||
db_path=db_path, | ||
project_location=project_location, | ||
) | ||
return | ||
|
@@ -128,10 +142,10 @@ def _build_and_deploy( | |
wheel_path, wheel_file = self._create_python_project_wheel(project_location) | ||
self._deploy( | ||
file_paths=[wheel_path], | ||
dbfs_path=dbfs_path, | ||
db_path=db_path, | ||
project_location=os.path.dirname(wheel_path), | ||
) | ||
install_path = f'{dbfs_path.replace("dbfs:/", "/dbfs/")}/{wheel_file}' | ||
install_path = f"{self.get_install_path(db_path)}/{wheel_file}" | ||
|
||
dependency_files = ["requirements.in", "requirements.txt"] | ||
index_urls = [] | ||
|
@@ -183,10 +197,10 @@ def _build_and_deploy( | |
line.strip() for line in f.readlines() if "index-url" in line | ||
] | ||
self._deploy( | ||
file_paths=list(files), dbfs_path=dbfs_path, project_location=project_location | ||
file_paths=list(files), db_path=db_path, project_location=project_location | ||
) | ||
|
||
install_path = f'{dbfs_path.replace("dbfs:/", "/dbfs/")}' | ||
install_path = self.get_install_path(db_path) | ||
index_urls_options = " ".join(index_urls) | ||
|
||
if dependency_file_exist: | ||
|
@@ -215,16 +229,54 @@ def _build_and_deploy( | |
def _deploy( | ||
self, | ||
file_paths: List[str], | ||
dbfs_path: str, | ||
db_path: str, | ||
project_location: str | ||
) -> None: | ||
if self.is_dbfs(db_path): | ||
self._deploy_dbfs(file_paths, db_path, project_location) | ||
else: | ||
w = WorkspaceClient() | ||
self._deploy_volumes(file_paths, db_path, project_location, w) | ||
|
||
def _deploy_dbfs( | ||
self, | ||
file_paths: List[str], | ||
db_path: str, | ||
project_location: str | ||
): | ||
def helper(file: str) -> None: | ||
target_path = f"{dbfs_path}/{os.path.relpath(file, project_location)}" | ||
target_path = f"{db_path}/{os.path.relpath(file, project_location)}" | ||
execute_shell_command(f"databricks fs cp --recursive --overwrite {file} {target_path}") | ||
logger.info(f"Uploaded {file} to {target_path}") | ||
|
||
execute_for_each_multithreaded(file_paths, lambda x: helper(x)) | ||
|
||
def _deploy_volumes( | ||
self, | ||
file_paths: List[str], | ||
db_path: str, | ||
project_location: str, | ||
workspace_client | ||
): | ||
def helper(wc, file: str) -> None: | ||
# sdk asks an absolute path | ||
if not os.path.isabs(file): | ||
cwd = os.getcwd() | ||
file = f"{cwd}/{file}" | ||
target_path = f"{db_path}/{os.path.relpath(file, project_location)}" | ||
# if the file already exists, sdk returns error message: The file being created already exists. | ||
# a feature request is already here: https://github.com/databricks/databricks-sdk-py/issues/548 | ||
try: | ||
wc.dbutils.fs.rm(target_path) | ||
except Exception: | ||
pass | ||
# sdk uses urllibs3 to parse paths. | ||
# It need to be file:// to be recognized as a local file. Otherwise it raises file not exist error | ||
wc.dbutils.fs.cp(f"file://{file}", target_path) | ||
logger.info(f"Uploaded {file} to {target_path}") | ||
|
||
execute_for_each_multithreaded(file_paths, lambda x: helper(workspace_client, x)) | ||
|
||
def _create_python_project_wheel(self, project_location: str) -> (str, str): | ||
dist_location = f"{project_location}/dist" | ||
execute_shell_command(f"rm {dist_location}/* 2>/dev/null || true") | ||
|
@@ -250,6 +302,26 @@ def _create_python_project_wheel(self, project_location: str) -> (str, str): | |
wheel_path = f"{dist_location}/{wheel_file}" | ||
return wheel_path, wheel_file | ||
|
||
def get_dbfs_path(self, path: Optional[str]) -> str: | ||
if path: | ||
logger.warning("The `dbfs_path` parameter is planned for deprecation. Please use the `dst_path` parameter instead.") | ||
if not self.is_dbfs(path): | ||
raise Exception("`dbfs_path` must start with dbfs:/") | ||
return path or f"dbfs:/temp/{os.environ['USER']}" | ||
|
||
def get_volumes_path(self, path: Optional[str]) -> str: | ||
if path and not path.startswith("/Volumes"): | ||
raise Exception("`use_volumes` is true. `dst_path` must start with /Volumes") | ||
return path or f"/Volumes/main/data_products/volume/db_rocket/{os.environ['USER']}" | ||
|
||
def get_install_path(self, db_path): | ||
if self.is_dbfs(db_path): | ||
return f'{db_path.replace("dbfs:/", "/dbfs/")}' | ||
return db_path | ||
|
||
def is_dbfs(self, db_path: str): | ||
return db_path.startswith("dbfs:/") | ||
|
||
|
||
def main(): | ||
fire.Fire(Rocket) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,15 +9,15 @@ | |
|
||
setuptools.setup( | ||
name="databricks-rocket", | ||
version="2.1.0", | ||
version="3.0.0", | ||
author="GetYourGuide", | ||
author_email="engineering.data-products@getyourguide.com", | ||
description="Keep your local python scripts installed and in sync with a databricks notebook. Shortens the feedback loop to develop projects using a hybrid enviroment", | ||
long_description=readme, | ||
long_description_content_type="text/markdown", | ||
url="https://github.com/getyourguide/db-rocket", | ||
packages=setuptools.find_packages(), | ||
install_requires=["fire", "watchdog~=2.1.9", "build", "databricks_cli"], | ||
install_requires=["fire", "watchdog~=2.1.9", "build", "databricks_cli", "databricks-sdk==0.33.0"], | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need to pin the version? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. as you mentioned, this sdk is still in beta, and when integrating it, I looked into the project, which seems to be a lot of things under development, so pin the version can guarantee things don't break. |
||
entry_points={ | ||
"console_scripts": ["rocket=rocket.rocket:main", "dbrocket=rocket.rocket:main"] | ||
}, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean
use_volumes
? we don't want to change the current behaviour but only add an option for using volumes.