Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Add artifact repository storage capabilities #33

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
- Update to MLflow 2.7.1
- Improve `table_exists()` in `example_merlion.py`
- SQLAlchemy: Use server-side `now()` function for "autoincrement" columns
- Add artifact repository storage capabilities

## 2023-09-12 0.1.1
- Documentation: Improve "Container Usage" page
Expand Down
97 changes: 97 additions & 0 deletions mlflow_cratedb/adapter/cratedb_artifact_repo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import datetime as dt
import os
import posixpath
from functools import lru_cache

from mlflow.entities import FileInfo
from mlflow.store.artifact.artifact_repo import ArtifactRepository, verify_artifact_path
from mlflow.store.artifact.artifact_repository_registry import _artifact_repository_registry
from mlflow.store.artifact.http_artifact_repo import HttpArtifactRepository
from mlflow.tracking._tracking_service.utils import _get_default_host_creds

from mlflow_cratedb.contrib.object_store import CrateDBObjectStore, decode_sqlalchemy_url

_MAX_CACHE_SECONDS = 300


def _get_utcnow_timestamp():
return dt.datetime.utcnow().timestamp()


@lru_cache(maxsize=64)
def _cached_get_cratedb_client(url, timestamp): # pylint: disable=unused-argument
"""
A cached `CrateDBObjectStore` client instance.

Caching is important so that there will be a dedicated client instance per
endpoint URL/bucket. Otherwise, a new client instance, with a corresponding
database connection, would be created on each operation.

Similar to the S3 client wrapper, in order to manage expired/stale
connections well, expire the connection regularly by using the
`timestamp` parameter to invalidate the function cache.
"""
store = CrateDBObjectStore(url=url)
store.connect()
return store


def _get_cratedb_client(url):
# Invalidate cache every `_MAX_CACHE_SECONDS`.
timestamp = int(_get_utcnow_timestamp() / _MAX_CACHE_SECONDS)

return _cached_get_cratedb_client(url, timestamp)


class CrateDBArtifactRepository(ArtifactRepository):
"""
Stores artifacts into a CrateDB object store.

crate://crate@localhost:4200/bucket-one?schema=testdrive
"""

ROOT_PATH = ""

def __init__(self, artifact_uri):
super().__init__(artifact_uri)
# Decode for verification purposes, in order to fail early.
decode_sqlalchemy_url(artifact_uri)

def _get_cratedb_client(self):
return _get_cratedb_client(url=self.artifact_uri)

@property
def _host_creds(self):
return _get_default_host_creds(self.artifact_uri)

def log_artifact(self, local_file, artifact_path=None):
verify_artifact_path(artifact_path)

dest_path = self.ROOT_PATH
if artifact_path:
dest_path = posixpath.join(self.ROOT_PATH, artifact_path)
dest_path = posixpath.join(dest_path, os.path.basename(local_file))
with open(local_file, "rb") as f:
self._get_cratedb_client().upload(dest_path, f.read())

def log_artifacts(self, local_dir, artifact_path=None):
HttpArtifactRepository.log_artifacts(self, local_dir, artifact_path=artifact_path)

def list_artifacts(self, path=None):
# CrateDBObjectStore.list() already returns tuples of `(path, is_dir, size)`,
# so the convergence to MLflow's `FileInfo` objects is straight-forward.
infos = []
for entry in self._get_cratedb_client().list(path):
infos.append(FileInfo(*entry))
return sorted(infos, key=lambda f: f.path)

def _download_file(self, remote_file_path, local_path):
payload = self._get_cratedb_client().download(remote_file_path)
with open(local_path, "wb") as f:
f.write(payload)

def delete_artifacts(self, artifact_path=None):
self._get_cratedb_client().delete(artifact_path)


_artifact_repository_registry.register("crate", CrateDBArtifactRepository)
Empty file.
254 changes: 254 additions & 0 deletions mlflow_cratedb/contrib/blob_store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,254 @@
"""
## About
Access CrateDB's BLOB store from the command-line.

## Usage

For convenient interactive use, define two environment variables.
When not defining `--url` or `CRATEDB_HTTP_URL`, the program will
connect to CrateDB at `crate@localhost:4200` by default.

Synopsis::

# Define the HTTP URL to your CrateDB instance.
export CRATEDB_HTTP_URL=https://username:password@cratedb.example.net:4200/

# Define the BLOB container name.
export CRATEDB_BLOB_CONTAINER=testdrive

# Upload an item to the BLOB store.
python blob_store.py upload /path/to/file
418a0143404fb2da8a1464ab721f6d5fb50c3b96

# Download an item from the BLOB store.
python blob_store.py download 418a0143404fb2da8a1464ab721f6d5fb50c3b96

Full command line example, without defining environment variables::

python blob_store.py \
--url=http://crate@localhost:4200/ --container=testdrive \
upload /path/to/file

## References

- https://cratedb.com/docs/python/en/latest/blobs.html
- https://cratedb.com/docs/crate/reference/en/latest/general/blobs.html
- https://cratedb.com/docs/crate/reference/en/latest/sql/statements/create-blob-table.html
"""
import io
import logging
import os
import sys
import typing as t
from argparse import ArgumentError, ArgumentParser
from pathlib import Path

from crate import client
from crate.client.blob import BlobContainer
from crate.client.connection import Connection
from crate.client.exceptions import ProgrammingError

logger = logging.getLogger(__name__)


class CrateDBBlobContainer:
"""
A wrapper around CrateDB's BLOB store.
"""

def __init__(self, url: str, name: str):
self.url = url
self.name = name
self.connection: Connection = None
self.container: BlobContainer = None

def connect(self):
self.connection = client.connect(self.url)
self.provision()
self.container = self.connection.get_blob_container(self.name)

def disconnect(self):
self.connection.close()

def provision(self):
"""
Create a new table for storing Binary Large Objects (BLOBs).

TODO: Submit issue about `CREATE BLOB TABLE IF NOT EXISTS` to crate/crate.
TODO: Also propagate schema name here, and check if addressing does
even work using the low-level Python API.

- https://cratedb.com/docs/crate/reference/en/latest/sql/statements/create-blob-table.html
"""
try:
self.run_sql(f'CREATE BLOB TABLE "{self.name}";')
except ProgrammingError as ex:
if "RelationAlreadyExists" not in ex.message:
raise

def upload(self, payload: t.Union[bytes, bytearray]) -> str:
"""
Upload an item to the BLOB store.

- https://cratedb.com/docs/python/en/latest/blobs.html#upload-blobs
- https://cratedb.com/docs/crate/reference/en/latest/general/blobs.html#uploading
"""
file = io.BytesIO(payload)
return self.container.put(file)

def download(self, digest: str) -> bytes:
"""
Download an item from the BLOB store.

- https://cratedb.com/docs/python/en/latest/blobs.html#retrieve-blobs
- https://cratedb.com/docs/crate/reference/en/latest/general/blobs.html#downloading
"""
payload = b""
for chunk in self.container.get(digest):
payload += chunk
return payload

def delete(self, digest: str) -> bool:
"""
Delete an item from the BLOB store.
"""
return self.container.delete(digest)

def refresh(self):
"""
Optionally synchronize data after write operations.
"""
self.run_sql(f'REFRESH TABLE "{self.name}";')

def run_sql(self, sql: str):
"""
Run SQL statements on the connection.
"""
return self.connection.cursor().execute(sql)

def __enter__(self):
self.connect()
return self

def __exit__(self, *excs):
self.disconnect()


def setup_logging(level=logging.INFO):
"""
What the function name says.
"""
log_format = "%(asctime)-15s [%(name)-10s] %(levelname)-8s: %(message)s"
logging.basicConfig(format=log_format, stream=sys.stderr, level=level)


def truncate(value: bytes) -> bytes:
"""
Truncate long string.

https://stackoverflow.com/a/2873416
"""
ellipsis_ = b"..."
maxlength = 100
strlength = maxlength - len(ellipsis_)
return value[:strlength] + (value[strlength:] and ellipsis_)


def example(url: str, container_name: str):
"""
An example conversation with the BLOB store (upload, download, delete).
"""

# Define arbitrary content for testing purposes.
content = "An example payload.".encode("utf-8")

# Upload and re-download content payload.
logger.info(f"Uploading: {truncate(content)!r}")
with CrateDBBlobContainer(url=url, name=container_name) as container:
identifier = container.upload(content)
logger.info(f"Identifier: {identifier}")

downloaded = container.download(identifier)
logger.info(f"Downloaded: {truncate(downloaded)!r}")

deleted = container.delete(identifier)
logger.info(f"Deleted: {deleted}")


def read_arguments():
parser = ArgumentParser()
url = parser.add_argument("-u", "--url", type=str)
container = parser.add_argument("-c", "--container", type=str)

actions = parser.add_subparsers(
dest="action",
title="action",
description="valid subcommands",
help="additional help",
)
upload = actions.add_parser("upload", aliases=["up", "put"])
download = actions.add_parser("download", aliases=["down", "get"])
delete = actions.add_parser("delete", aliases=["del", "rm"])

path = upload.add_argument("path", type=Path)
download.add_argument("digest", type=str)
delete.add_argument("digest", type=str)

parser.set_defaults(url=os.environ.get("CRATEDB_HTTP_URL", "http://crate@localhost:4200/"))
parser.set_defaults(container=os.environ.get("CRATEDB_BLOB_CONTAINER"))

args = parser.parse_args()

if not args.url:
raise ArgumentError(
url,
"URL to database not given or empty. " "Use `--url` or `CRATEDB_HTTP_URL` environment variable",
)

if not args.container:
raise ArgumentError(
container,
"BLOB container name not given or empty. "
"Use `--container` or `CRATEDB_BLOB_CONTAINER` environment variable",
)

if not args.action:
raise ArgumentError(actions, "Action not given: Use one of {upload,download,delete}")

if args.action == "upload" and not args.path.exists():
raise ArgumentError(path, f"Path does not exist: {args.path}")

if args.action in ["download", "delete"] and not args.digest:
raise ArgumentError(path, "BLOB digest not given")

return args


def main():
args = read_arguments()
with CrateDBBlobContainer(url=args.url, name=args.container) as container:
if args.action == "upload":
payload = args.path.read_bytes()
logger.info(f"Upload: {truncate(payload)!r}")
digest = container.upload(payload)
print(digest) # noqa: T201
elif args.action == "download":
payload = container.download(args.digest)
sys.stdout.buffer.write(payload)
elif args.action == "delete":
container.delete(args.digest)
else:
raise KeyError(f"Action not implemented: {args.action}")


def run_example():
example(
url="http://crate@localhost:4200/",
container_name="testdrive",
)


if __name__ == "__main__":
setup_logging()
# run_example() # noqa: ERA001
main()
Loading