Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add uc volumes support #82

Merged
merged 5 commits into from
Oct 7, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog db-rocket

## Version 2.2.0
- Add `use_volumes` and `dst_path` arguments to support uploading to Unity Catalog Volumes.

## Version 2.1.0
- New paramter for ``rocket launch --glob_path=<...>``, which allows to specify a list of globs for files to deploy during launch.

Expand Down
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ poetry
mypy
SecretStorage
readme-renderer
twine
twine
databricks-sdk==0.33.0
100 changes: 80 additions & 20 deletions rocket/rocket.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import os
import glob
from typing import Optional, List, Union

import fire
Expand Down Expand Up @@ -54,49 +53,53 @@ def launch(
project_location: str = ".",
dbfs_path: Optional[str] = None,
watch: bool = True,
glob_path: Optional[Union[str, List[str]]] = None
glob_path: Optional[Union[str, List[str]]] = None,
use_volumes: Optional[bool] = False,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean use_volumes? we don't want to change the current behaviour but only add an option for using volumes.

dst_path: Optional[str] = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we deprecate dbfs_path since you can cover the use case with dst_path

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a warning there so people can still use it now but we can deprecate it in the next version.

) -> None:
"""
Entrypoint of the application, triggers a build and deploy
:param project_location: path to project code, default: `"."`
:param dbfs_path: path where the wheel will be stored, ex: dbfs:/tmp/myteam/myproject
:param dbfs_path: path where the wheel will be stored, ex: dbfs:/tmp/myteam/myproject. Only support dbfs path.
:param watch: Set to false if you don't want to automatically sync your files
:param glob_path: glob string or list of strings for additional files to deploy, e.g. "*.json"
:param use_volumes: upload files to unity catalog volumes.
:param dst_path: Destination path to store the files. Support both dbfs:/ and /Volumes. Ideally, we should use dst_path and deprecate dbfs_path.
:return:
"""
if os.getenv("DATABRICKS_TOKEN") is None:
raise Exception("DATABRICKS_TOKEN must be set for db-rocket to work")

if dbfs_path is not None and not dbfs_path.startswith("dbfs:/"):
raise Exception("`dbfs_path` must start with dbfs:/")

try:
execute_shell_command(f"databricks fs ls dbfs:/")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add here some validation if the databricks workspace API works?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I've added some :)

except Exception as e:
raise Exception(
f"Error accessing DBFS via databricks-cli. Please check if your databricks token is set and valid? Try to generate a new token and update existing one with `databricks configure --token`. Error details: {e}"
)

if not dbfs_path:
dbfs_path = f"dbfs:/temp/{os.environ['USER']}"
if use_volumes:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe something like if is_dbfs_path

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we did that, we'd need to choose some default behavior in case no path is provided (since dst_path is optional), so it would have to be something like:

if dst_path is not None and is_dbfs_path(dst_path)

And so you wouldn't be able to upload to dbfs without providing some path, which would be backwards incompatible.

db_path = self.get_volumes_path(dst_path)
else:
path_to_use = dst_path if dst_path else dbfs_path
db_path = self.get_dbfs_path(path_to_use)

if watch:
project_name = os.path.abspath(project_location).split("/")[-1]
dbfs_path = f"{dbfs_path}/{project_name}"
db_path = f"{db_path}/{project_name}"

glob_paths = []
if isinstance(glob_path, str):
glob_paths = [os.path.join(project_location, glob_path)]
elif isinstance(glob_path, list):
glob_paths = [os.path.join(project_location, path) for path in glob_path]

self._build_and_deploy(watch=watch, project_location=project_location, dbfs_path=dbfs_path, glob_paths=glob_paths)
self._build_and_deploy(watch=watch, project_location=project_location, db_path=db_path, glob_paths=glob_paths)
if watch:
watcher = FileWatcher(
project_location,
lambda x: self._build_and_deploy(
watch=watch,
modified_files=watcher.modified_files,
dbfs_path=dbfs_path,
db_path=db_path,
project_location=project_location,
glob_paths=glob_path
),
Expand All @@ -108,15 +111,15 @@ def _build_and_deploy(
self,
watch: bool,
project_location: str,
dbfs_path: str,
db_path: str,
modified_files: Optional[List[str]] = None,
glob_paths: Optional[List[str]] = None
) -> None:
if modified_files:
logger.info(f"Found changes in {modified_files}. Overwriting them.")
self._deploy(
file_paths=modified_files,
dbfs_path=dbfs_path,
db_path=db_path,
project_location=project_location,
)
return
Expand All @@ -128,10 +131,10 @@ def _build_and_deploy(
wheel_path, wheel_file = self._create_python_project_wheel(project_location)
self._deploy(
file_paths=[wheel_path],
dbfs_path=dbfs_path,
db_path=db_path,
project_location=os.path.dirname(wheel_path),
)
install_path = f'{dbfs_path.replace("dbfs:/", "/dbfs/")}/{wheel_file}'
install_path = f"{self.get_install_path(db_path)}/{wheel_file}"

dependency_files = ["requirements.in", "requirements.txt"]
index_urls = []
Expand Down Expand Up @@ -183,10 +186,10 @@ def _build_and_deploy(
line.strip() for line in f.readlines() if "index-url" in line
]
self._deploy(
file_paths=list(files), dbfs_path=dbfs_path, project_location=project_location
file_paths=list(files), db_path=db_path, project_location=project_location
)

install_path = f'{dbfs_path.replace("dbfs:/", "/dbfs/")}'
install_path = self.get_install_path(db_path)
index_urls_options = " ".join(index_urls)

if dependency_file_exist:
Expand Down Expand Up @@ -215,16 +218,55 @@ def _build_and_deploy(
def _deploy(
self,
file_paths: List[str],
dbfs_path: str,
db_path: str,
project_location: str
) -> None:
if self.is_dbfs(db_path):
self._deploy_dbfs(file_paths, db_path, project_location)
else:
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
self._deploy_volumes(file_paths, db_path, project_location, w)

def _deploy_dbfs(
self,
file_paths: List[str],
db_path: str,
project_location: str
):
def helper(file: str) -> None:
target_path = f"{dbfs_path}/{os.path.relpath(file, project_location)}"
target_path = f"{db_path}/{os.path.relpath(file, project_location)}"
execute_shell_command(f"databricks fs cp --recursive --overwrite {file} {target_path}")
logger.info(f"Uploaded {file} to {target_path}")

execute_for_each_multithreaded(file_paths, lambda x: helper(x))

def _deploy_volumes(
self,
file_paths: List[str],
db_path: str,
project_location: str,
workspace_client
):
def helper(wc, file: str) -> None:
# sdk asks an absolute path
if not os.path.isabs(file):
cwd = os.getcwd()
file = f"{cwd}/{file}"
target_path = f"{db_path}/{os.path.relpath(file, project_location)}"
# if the file already exists, sdk returns error message: The file being created already exists.
# a feature request is already here: https://github.com/databricks/databricks-sdk-py/issues/548
try:
wc.dbutils.fs.rm(target_path)
except Exception:
pass
# sdk uses urllibs3 to parse paths.
# It need to be file:// to be recognized as a local file. Otherwise it raises file not exist error
wc.dbutils.fs.cp(f"file://{file}", target_path)
logger.info(f"Uploaded {file} to {target_path}")

execute_for_each_multithreaded(file_paths, lambda x: helper(workspace_client, x))

def _create_python_project_wheel(self, project_location: str) -> (str, str):
dist_location = f"{project_location}/dist"
execute_shell_command(f"rm {dist_location}/* 2>/dev/null || true")
Expand All @@ -250,6 +292,24 @@ def _create_python_project_wheel(self, project_location: str) -> (str, str):
wheel_path = f"{dist_location}/{wheel_file}"
return wheel_path, wheel_file

def get_dbfs_path(self, path: Optional[str]) -> str:
if path and not self.is_dbfs(path):
raise Exception("`dbfs_path` must start with dbfs:/")
return path or f"dbfs:/temp/{os.environ['USER']}"

def get_volumes_path(self, path: Optional[str]) -> str:
if path and not path.startswith("/Volumes"):
raise Exception("`use_volumes` is true. `dst_path` must start with /Volumes")
return path or f"/Volumes/main/data_products/volume/db_rocket/{os.environ['USER']}"

def get_install_path(self, db_path):
if self.is_dbfs(db_path):
return f'{db_path.replace("dbfs:/", "/dbfs/")}'
return db_path

def is_dbfs(self, db_path: str):
return db_path.startswith("dbfs:/")


def main():
fire.Fire(Rocket)
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@

setuptools.setup(
name="databricks-rocket",
version="2.1.0",
version="2.2.0",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's set it as a major version update

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so bump it to 3.0.0?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's backwards compatible, so I would think it should be a minor version update.

author="GetYourGuide",
author_email="engineering.data-products@getyourguide.com",
description="Keep your local python scripts installed and in sync with a databricks notebook. Shortens the feedback loop to develop projects using a hybrid enviroment",
long_description=readme,
long_description_content_type="text/markdown",
url="https://github.com/getyourguide/db-rocket",
packages=setuptools.find_packages(),
install_requires=["fire", "watchdog~=2.1.9", "build", "databricks_cli"],
install_requires=["fire", "watchdog~=2.1.9", "build", "databricks_cli", "databricks-sdk==0.33.0"],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to pin the version?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as you mentioned, this sdk is still in beta, and when integrating it, I looked into the project, which seems to be a lot of things under development, so pin the version can guarantee things don't break.

entry_points={
"console_scripts": ["rocket=rocket.rocket:main", "dbrocket=rocket.rocket:main"]
},
Expand Down
Loading