Skip to content

Commit

Permalink
[MLFlowObjectStore] [1/2] Base implementation for MLFlowObjectStore (#…
Browse files Browse the repository at this point in the history
…2802)

* Implementation of MLFlowObjectStore

* Update object store test settings

* Import mlflow dependencies inline

* Fix tests and ignore some pyright

* Bugfix

* Enforce experiment and run in get_artifact_path

* Update placeholders

* Make logs debug instead of info

* Minor PR comments

* MLflow casing

* tracking_uri fixes

* Update comments

* Update placeholders

* Fix tests

* Fix pyright

* Use tempfile for temp dirs

* Read tracking uri env var directly

* Remove dist from MLFlowObjectStore

---------

Co-authored-by: Daniel King <43149077+dakinggg@users.noreply.github.com>
  • Loading branch information
jerrychen109 and dakinggg authored Jan 8, 2024
1 parent a36fb74 commit f50dcaf
Show file tree
Hide file tree
Showing 8 changed files with 743 additions and 14 deletions.
6 changes: 4 additions & 2 deletions composer/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@
from composer.utils.iter_helpers import IteratorFileStream, ensure_tuple, map_collection
from composer.utils.misc import (create_interval_scheduler, get_free_tcp_port, is_model_deepspeed, is_model_fsdp,
is_notebook, model_eval_mode, using_torch_2)
from composer.utils.object_store import (GCSObjectStore, LibcloudObjectStore, ObjectStore, ObjectStoreTransientError,
OCIObjectStore, S3ObjectStore, SFTPObjectStore, UCObjectStore)
from composer.utils.object_store import (GCSObjectStore, LibcloudObjectStore, MLFlowObjectStore, ObjectStore,
ObjectStoreTransientError, OCIObjectStore, S3ObjectStore, SFTPObjectStore,
UCObjectStore)
from composer.utils.retrying import retry
from composer.utils.string_enum import StringEnum

Expand All @@ -44,6 +45,7 @@
'OCIObjectStore',
'GCSObjectStore',
'UCObjectStore',
'MLFlowObjectStore',
'MissingConditionalImportError',
'import_object',
'is_model_deepspeed',
Expand Down
12 changes: 10 additions & 2 deletions composer/utils/object_store/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,21 @@

from composer.utils.object_store.gcs_object_store import GCSObjectStore
from composer.utils.object_store.libcloud_object_store import LibcloudObjectStore
from composer.utils.object_store.mlflow_object_store import MLFlowObjectStore
from composer.utils.object_store.object_store import ObjectStore, ObjectStoreTransientError
from composer.utils.object_store.oci_object_store import OCIObjectStore
from composer.utils.object_store.s3_object_store import S3ObjectStore
from composer.utils.object_store.sftp_object_store import SFTPObjectStore
from composer.utils.object_store.uc_object_store import UCObjectStore

__all__ = [
'ObjectStore', 'ObjectStoreTransientError', 'LibcloudObjectStore', 'S3ObjectStore', 'SFTPObjectStore',
'OCIObjectStore', 'GCSObjectStore', 'UCObjectStore'
'ObjectStore',
'ObjectStoreTransientError',
'LibcloudObjectStore',
'MLFlowObjectStore',
'S3ObjectStore',
'SFTPObjectStore',
'OCIObjectStore',
'GCSObjectStore',
'UCObjectStore',
]
383 changes: 383 additions & 0 deletions composer/utils/object_store/mlflow_object_store.py

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion composer/utils/object_store/object_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def upload_object(self,
Args:
object_name (str): Object name (where object will be stored in the container)
filename (str | pathlib.Path): Path the the object on disk
filename (str | pathlib.Path): Path to the object on disk
callback ((int, int) -> None, optional): If specified, the callback is periodically called with the number of bytes
uploaded and the total size of the object being uploaded.
**kwargs: other arguments to the upload object function are supported
Expand Down Expand Up @@ -133,6 +133,7 @@ def download_object(
downloaded and the total size of the object.
Raises:
FileExistsError: If ``filename`` already exists and ``overwrite`` is ``False``.
FileNotFoundError: If the file was not found in the object store.
ObjectStoreTransientError: If there was a transient connection issue with downloading the object.
"""
Expand Down
14 changes: 11 additions & 3 deletions composer/utils/object_store/uc_object_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def __init__(self, path: str) -> None:
try:
from databricks.sdk import WorkspaceClient
except ImportError as e:
raise MissingConditionalImportError('databricks', conda_package='databricks-sdk>=0.8.0,<1.0') from e
raise MissingConditionalImportError('databricks', conda_package='databricks-sdk>=0.15.0,<1.0') from e

try:
self.client = WorkspaceClient()
Expand Down Expand Up @@ -167,7 +167,10 @@ def download_object(self,
try:
from databricks.sdk.core import DatabricksError
try:
with self.client.files.download(self._get_object_path(object_name)).contents as resp:
contents = self.client.files.download(self._get_object_path(object_name)).contents
assert contents is not None

with contents as resp: # pyright: ignore
with open(tmp_path, 'wb') as f:
# Chunk the data into multiple blocks of 64MB to avoid
# OOMs when downloading really large files
Expand Down Expand Up @@ -199,11 +202,15 @@ def get_object_size(self, object_name: str) -> int:
Raises:
FileNotFoundError: If the file was not found in the object store.
IsADirectoryError: If the object is a directory, not a file.
"""
from databricks.sdk.core import DatabricksError
try:
file_info = self.client.files.get_status(self._get_object_path(object_name))
return file_info.file_size
if file_info.is_dir:
raise IsADirectoryError(f'{object_name} is a UC directory, not a file.')

return file_info.file_size # pyright: ignore
except DatabricksError as e:
_wrap_errors(self.get_uri(object_name), e)

Expand Down Expand Up @@ -231,6 +238,7 @@ def list_objects(self, prefix: Optional[str]) -> List[str]:
path=self._UC_VOLUME_LIST_API_ENDPOINT,
data=data,
headers={'Source': 'mosaicml/composer'})
assert isinstance(resp, dict)
return [f['path'] for f in resp.get('files', []) if not f['is_dir']]
except DatabricksError as e:
_wrap_errors(self.get_uri(prefix), e)
5 changes: 3 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ def package_files(prefix: str, directory: str, extension: str):
break
else:
assert end != -1, 'there should be a balanced number of start and ends'
long_description = long_description[:start] + long_description[end + len(end_tag):]
long_description = long_description[:start] + \
long_description[end + len(end_tag):]

install_requires = [
'pyyaml>=6.0,<7',
Expand Down Expand Up @@ -223,7 +224,7 @@ def package_files(prefix: str, directory: str, extension: str):
]

extra_deps['mlflow'] = [
'mlflow>=2.9.0,<3.0',
'mlflow>=2.9.2,<3.0',
]

extra_deps['pandas'] = ['pandas>=2.0.0,<3.0']
Expand Down
9 changes: 5 additions & 4 deletions tests/utils/object_store/object_store_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@

import composer.utils.object_store
import composer.utils.object_store.sftp_object_store
from composer.utils.object_store import (GCSObjectStore, LibcloudObjectStore, ObjectStore, OCIObjectStore,
S3ObjectStore, SFTPObjectStore, UCObjectStore)
from composer.utils.object_store import (GCSObjectStore, LibcloudObjectStore, MLFlowObjectStore, ObjectStore,
OCIObjectStore, S3ObjectStore, SFTPObjectStore, UCObjectStore)
from composer.utils.object_store.sftp_object_store import SFTPObjectStore
from tests.common import get_module_subclasses

Expand Down Expand Up @@ -56,8 +56,9 @@
object_stores = [
pytest.param(x, marks=_object_store_marks[x], id=x.__name__)
for x in get_module_subclasses(composer.utils.object_store, ObjectStore)
# Note: OCI, GCS and UC have their own test suite, so they are exempt from being included in this one.``
if not issubclass(x, OCIObjectStore) and not issubclass(x, GCSObjectStore) and not issubclass(x, UCObjectStore)
# Note: OCI, GCS, UC, and MLFlow have their own test suite, so they are exempt from being included in this one.``
if not issubclass(x, OCIObjectStore) and not issubclass(x, GCSObjectStore) and not issubclass(x, UCObjectStore) and
not issubclass(x, MLFlowObjectStore)
]


Expand Down
Loading

0 comments on commit f50dcaf

Please sign in to comment.