From e2e84b1bbf0f6cb0a2f12c48bdd26d28aa7ecc1a Mon Sep 17 00:00:00 2001 From: xloya <982052490@qq.com> Date: Tue, 18 Jun 2024 20:57:08 +0800 Subject: [PATCH] [#2059] feat(client-python): Support Gravitino Virtual FileSystem in Python (#3528) ### What changes were proposed in this pull request? Support Gravitino Virtual File System in Python so that we can read and write Fileset storage data. The first PR only supports HDFS. After research, the following popular cloud storages or companies have implemented their own FileSystem based on fsspec(https://filesystem-spec.readthedocs.io/en/latest/index.html): 1. S3(https://github.com/fsspec/s3fs) 2. Azure(https://github.com/fsspec/adlfs) 3. Gcs(https://github.com/fsspec/gcsfs) 4. OSS(https://github.com/fsspec/ossfs) 5. Databricks(https://github.com/fsspec/filesystem_spec/blob/master/fsspec/implementations/dbfs.py) 6. Snowflake(https://github.com/snowflakedb/snowflake-ml-python), So this PR will implement GVFS based on the fsspec interface. ### Why are the changes needed? Fix: #2059 ### How was this patch tested? Add some UTs and ITs. --------- Co-authored-by: xiaojiebao --- LICENSE.bin | 4 + clients/client-python/gravitino/__init__.py | 1 + .../gravitino/filesystem/__init__.py | 4 + .../gravitino/filesystem/gvfs.py | 672 ++++++++++++ clients/client-python/requirements-dev.txt | 4 + clients/client-python/requirements.txt | 6 +- .../tests/unittests/mock_base.py | 84 ++ .../tests/unittests/test_gvfs_with_local.py | 987 ++++++++++++++++++ 8 files changed, 1761 insertions(+), 1 deletion(-) create mode 100644 clients/client-python/gravitino/filesystem/__init__.py create mode 100644 clients/client-python/gravitino/filesystem/gvfs.py create mode 100644 clients/client-python/tests/unittests/mock_base.py create mode 100644 clients/client-python/tests/unittests/test_gvfs_with_local.py diff --git a/LICENSE.bin b/LICENSE.bin index 60db5658127..2bb7b2902f0 100644 --- a/LICENSE.bin +++ b/LICENSE.bin @@ -355,6 +355,7 @@ XNIO API WildFly Confluent Kafka Streams Examples + Apache Arrow This product bundles various third-party components also under the Apache Software Foundation License 1.1 @@ -382,6 +383,7 @@ ParaNamer RE2/J ZSTD JNI + fsspec This product bundles various third-party components also under the MIT license @@ -393,6 +395,8 @@ Protocol Buffers Treelayout Kyligence/kylinpy + elarivie/pyReaderWriterLock + tkem/cachetools This product bundles various third-party components also under the Common Development and Distribution License 1.0 diff --git a/clients/client-python/gravitino/__init__.py b/clients/client-python/gravitino/__init__.py index 24db62a279b..21876e0600a 100644 --- a/clients/client-python/gravitino/__init__.py +++ b/clients/client-python/gravitino/__init__.py @@ -13,3 +13,4 @@ from gravitino.client.gravitino_admin_client import GravitinoAdminClient from gravitino.client.gravitino_metalake import GravitinoMetalake from gravitino.name_identifier import NameIdentifier +from gravitino.filesystem import gvfs diff --git a/clients/client-python/gravitino/filesystem/__init__.py b/clients/client-python/gravitino/filesystem/__init__.py new file mode 100644 index 00000000000..5779a3ad252 --- /dev/null +++ b/clients/client-python/gravitino/filesystem/__init__.py @@ -0,0 +1,4 @@ +""" +Copyright 2024 Datastrato Pvt Ltd. +This software is licensed under the Apache License version 2. +""" diff --git a/clients/client-python/gravitino/filesystem/gvfs.py b/clients/client-python/gravitino/filesystem/gvfs.py new file mode 100644 index 00000000000..8e978fb0f14 --- /dev/null +++ b/clients/client-python/gravitino/filesystem/gvfs.py @@ -0,0 +1,672 @@ +""" +Copyright 2024 Datastrato Pvt Ltd. +This software is licensed under the Apache License version 2. +""" + +from enum import Enum +from pathlib import PurePosixPath +from typing import Dict, Tuple +import re +import fsspec + +from cachetools import TTLCache +from fsspec import AbstractFileSystem +from fsspec.implementations.local import LocalFileSystem +from fsspec.implementations.arrow import ArrowFSWrapper +from fsspec.utils import infer_storage_options +from pyarrow.fs import HadoopFileSystem +from readerwriterlock import rwlock +from gravitino.api.catalog import Catalog +from gravitino.api.fileset import Fileset +from gravitino.client.gravitino_client import GravitinoClient +from gravitino.exceptions.gravitino_runtime_exception import GravitinoRuntimeException +from gravitino.name_identifier import NameIdentifier + +PROTOCOL_NAME = "gvfs" + + +class StorageType(Enum): + HDFS = "hdfs" + LOCAL = "file" + + +class FilesetContext: + """A context object that holds the information about the fileset and the file system which used in + the GravitinoVirtualFileSystem's operations. + """ + + def __init__( + self, + name_identifier: NameIdentifier, + fileset: Fileset, + fs: AbstractFileSystem, + storage_type: StorageType, + actual_path: str, + ): + self._name_identifier = name_identifier + self._fileset = fileset + self._fs = fs + self._storage_type = storage_type + self._actual_path = actual_path + + def get_name_identifier(self): + return self._name_identifier + + def get_fileset(self): + return self._fileset + + def get_fs(self): + return self._fs + + def get_actual_path(self): + return self._actual_path + + def get_storage_type(self): + return self._storage_type + + +class GravitinoVirtualFileSystem(fsspec.AbstractFileSystem): + """This is a virtual file system which users can access `fileset` and + other resources. + + It obtains the actual storage location corresponding to the resource from the + Gravitino server, and creates an independent file system for it to act as an agent for users to + access the underlying storage. + """ + + # Override the parent variable + protocol = PROTOCOL_NAME + _identifier_pattern = re.compile("^fileset/([^/]+)/([^/]+)/([^/]+)(?:/[^/]+)*/?$") + + def __init__( + self, + server_uri=None, + metalake_name=None, + cache_size=20, + cache_expired_time=300, + **kwargs, + ): + self._metalake = metalake_name + self._client = GravitinoClient( + uri=server_uri, metalake_name=metalake_name, check_version=False + ) + self._cache = TTLCache(maxsize=cache_size, ttl=cache_expired_time) + self._cache_lock = rwlock.RWLockFair() + + super().__init__(**kwargs) + + @property + def fsid(self): + return PROTOCOL_NAME + + def sign(self, path, expiration=None, **kwargs): + """We do not support to create a signed URL representing the given path in gvfs.""" + raise GravitinoRuntimeException( + "Sign is not implemented for Gravitino Virtual FileSystem." + ) + + def ls(self, path, detail=True, **kwargs): + """List the files and directories info of the path. + :param path: Virtual fileset path + :param detail: Whether to show the details for the files and directories info + :param kwargs: Extra args + :return If details is true, returns a list of file info dicts, else returns a list of file paths + """ + context: FilesetContext = self._get_fileset_context(path) + if detail: + entries = [ + self._convert_actual_info(entry, context) + for entry in context.get_fs().ls( + self._strip_storage_protocol( + context.get_storage_type(), context.get_actual_path() + ), + detail=True, + ) + ] + return entries + entries = [ + self._convert_actual_path(entry_path, context) + for entry_path in context.get_fs().ls( + self._strip_storage_protocol( + context.get_storage_type(), context.get_actual_path() + ), + detail=False, + ) + ] + return entries + + def info(self, path, **kwargs): + """Get file info. + :param path: Virtual fileset path + :param kwargs: Extra args + :return A file info dict + """ + context: FilesetContext = self._get_fileset_context(path) + actual_info: Dict = context.get_fs().info( + self._strip_storage_protocol( + context.get_storage_type(), context.get_actual_path() + ) + ) + return self._convert_actual_info(actual_info, context) + + def exists(self, path, **kwargs): + """Check if a file or a directory exists. + :param path: Virtual fileset path + :param kwargs: Extra args + :return If a file or directory exists, it returns True, otherwise False + """ + context: FilesetContext = self._get_fileset_context(path) + return context.get_fs().exists( + self._strip_storage_protocol( + context.get_storage_type(), context.get_actual_path() + ) + ) + + def cp_file(self, path1, path2, **kwargs): + """Copy a file. + :param path1: Virtual src fileset path + :param path2: Virtual dst fileset path, should be consistent with the src path fileset identifier + :param kwargs: Extra args + """ + src_path = self._pre_process_path(path1) + dst_path = self._pre_process_path(path2) + src_identifier: NameIdentifier = self._extract_identifier(src_path) + dst_identifier: NameIdentifier = self._extract_identifier(dst_path) + if src_identifier != dst_identifier: + raise GravitinoRuntimeException( + f"Destination file path identifier: `{dst_identifier}` should be same with src file path " + f"identifier: `{src_identifier}`." + ) + src_context: FilesetContext = self._get_fileset_context(src_path) + if self._check_mount_single_file( + src_context.get_fileset(), + src_context.get_fs(), + src_context.get_storage_type(), + ): + raise GravitinoRuntimeException( + f"Cannot cp file of the fileset: {src_identifier} which only mounts to a single file." + ) + dst_context: FilesetContext = self._get_fileset_context(dst_path) + + src_context.get_fs().cp_file( + self._strip_storage_protocol( + src_context.get_storage_type(), src_context.get_actual_path() + ), + self._strip_storage_protocol( + dst_context.get_storage_type(), dst_context.get_actual_path() + ), + ) + + def mv(self, path1, path2, recursive=False, maxdepth=None, **kwargs): + """Move a file to another directory. + This can move a file to another existing directory. + If the target path directory does not exist, an exception will be thrown. + :param path1: Virtual src fileset path + :param path2: Virtual dst fileset path, should be consistent with the src path fileset identifier + :param recursive: Whether to move recursively + :param maxdepth: Maximum depth of recursive move + :param kwargs: Extra args + """ + src_path = self._pre_process_path(path1) + dst_path = self._pre_process_path(path2) + src_identifier: NameIdentifier = self._extract_identifier(src_path) + dst_identifier: NameIdentifier = self._extract_identifier(dst_path) + if src_identifier != dst_identifier: + raise GravitinoRuntimeException( + f"Destination file path identifier: `{dst_identifier}`" + f" should be same with src file path identifier: `{src_identifier}`." + ) + src_context: FilesetContext = self._get_fileset_context(src_path) + if self._check_mount_single_file( + src_context.get_fileset(), + src_context.get_fs(), + src_context.get_storage_type(), + ): + raise GravitinoRuntimeException( + f"Cannot cp file of the fileset: {src_identifier} which only mounts to a single file." + ) + dst_context: FilesetContext = self._get_fileset_context(dst_path) + if src_context.get_storage_type() == StorageType.HDFS: + src_context.get_fs().mv( + self._strip_storage_protocol( + src_context.get_storage_type(), src_context.get_actual_path() + ), + self._strip_storage_protocol( + dst_context.get_storage_type(), dst_context.get_actual_path() + ), + ) + elif src_context.get_storage_type() == StorageType.LOCAL: + src_context.get_fs().mv( + self._strip_storage_protocol( + src_context.get_storage_type(), src_context.get_actual_path() + ), + self._strip_storage_protocol( + dst_context.get_storage_type(), dst_context.get_actual_path() + ), + recursive, + maxdepth, + ) + else: + raise GravitinoRuntimeException( + f"Storage type:{src_context.get_storage_type()} doesn't support now." + ) + + def _rm(self, path): + raise GravitinoRuntimeException( + "Deprecated method, use `rm_file` method instead." + ) + + def rm(self, path, recursive=False, maxdepth=None): + """Remove a file or directory. + :param path: Virtual fileset path + :param recursive: Whether to remove the directory recursively. + When removing a directory, this parameter should be True. + :param maxdepth: The maximum depth to remove the directory recursively. + """ + context: FilesetContext = self._get_fileset_context(path) + context.get_fs().rm( + self._strip_storage_protocol( + context.get_storage_type(), context.get_actual_path() + ), + recursive, + maxdepth, + ) + + def rm_file(self, path): + """Remove a file. + :param path: Virtual fileset path + """ + context: FilesetContext = self._get_fileset_context(path) + context.get_fs().rm_file( + self._strip_storage_protocol( + context.get_storage_type(), context.get_actual_path() + ) + ) + + def rmdir(self, path): + """Remove a directory. + It will delete a directory and all its contents recursively for PyArrow.HadoopFileSystem. + And it will throw an exception if delete a directory which is non-empty for LocalFileSystem. + :param path: Virtual fileset path + """ + context: FilesetContext = self._get_fileset_context(path) + context.get_fs().rmdir( + self._strip_storage_protocol( + context.get_storage_type(), context.get_actual_path() + ) + ) + + def open( + self, + path, + mode="rb", + block_size=None, + cache_options=None, + compression=None, + **kwargs, + ): + """Open a file to read/write/append. + :param path: Virtual fileset path + :param mode: The mode now supports: rb(read), wb(write), ab(append). See builtin ``open()`` + :param block_size: Some indication of buffering - this is a value in bytes + :param cache_options: Extra arguments to pass through to the cache + :param compression: If given, open file using compression codec + :param kwargs: Extra args + :return A file-like object from the filesystem + """ + context: FilesetContext = self._get_fileset_context(path) + return context.get_fs().open( + self._strip_storage_protocol( + context.get_storage_type(), context.get_actual_path() + ), + mode, + block_size, + cache_options, + compression, + **kwargs, + ) + + def mkdir(self, path, create_parents=True, **kwargs): + """Make a directory. + if create_parents=True, this is equivalent to ``makedirs``. + + :param path: Virtual fileset path + :param create_parents: Create parent directories if missing when set to True + :param kwargs: Extra args + """ + context: FilesetContext = self._get_fileset_context(path) + context.get_fs().mkdir( + self._strip_storage_protocol( + context.get_storage_type(), context.get_actual_path() + ), + create_parents, + **kwargs, + ) + + def makedirs(self, path, exist_ok=True): + """Make a directory recursively. + :param path: Virtual fileset path + :param exist_ok: Continue if a directory already exists + """ + context: FilesetContext = self._get_fileset_context(path) + context.get_fs().makedirs( + self._strip_storage_protocol( + context.get_storage_type(), context.get_actual_path() + ), + exist_ok, + ) + + def created(self, path): + """Return the created timestamp of a file as a datetime.datetime + Only supports for `fsspec.LocalFileSystem` now. + :param path: Virtual fileset path + :return Created time(datetime.datetime) + """ + context: FilesetContext = self._get_fileset_context(path) + if context.get_storage_type() == StorageType.LOCAL: + return context.get_fs().created( + self._strip_storage_protocol( + context.get_storage_type(), context.get_actual_path() + ) + ) + raise GravitinoRuntimeException( + f"Storage type:{context.get_storage_type()} doesn't support now." + ) + + def modified(self, path): + """Returns the modified time of the path file if it exists. + :param path: Virtual fileset path + :return Modified time(datetime.datetime) + """ + context: FilesetContext = self._get_fileset_context(path) + return context.get_fs().modified( + self._strip_storage_protocol( + context.get_storage_type(), context.get_actual_path() + ) + ) + + def cat_file(self, path, start=None, end=None, **kwargs): + """Get the content of a file. + :param path: Virtual fileset path + :param start: The offset in bytes to start reading from. It can be None. + :param end: The offset in bytes to end reading at. It can be None. + :param kwargs: Extra args + :return File content + """ + context: FilesetContext = self._get_fileset_context(path) + return context.get_fs().cat_file( + self._strip_storage_protocol( + context.get_storage_type(), context.get_actual_path() + ), + start, + end, + **kwargs, + ) + + def get_file(self, rpath, lpath, callback=None, outfile=None, **kwargs): + """Copy single remote file to local. + :param rpath: Remote file path + :param lpath: Local file path + :param callback: The callback class + :param outfile: The output file path + :param kwargs: Extra args + """ + if not lpath.startswith(f"{StorageType.LOCAL.value}:") and not lpath.startswith( + "/" + ): + raise GravitinoRuntimeException( + "Doesn't support copy a remote gvfs file to an another remote file." + ) + context: FilesetContext = self._get_fileset_context(rpath) + context.get_fs().get_file( + self._strip_storage_protocol( + context.get_storage_type(), context.get_actual_path() + ), + lpath, + **kwargs, + ) + + def _convert_actual_path(self, path, context: FilesetContext): + """Convert an actual path to a virtual path. + The virtual path is like `fileset/{catalog}/{schema}/{fileset}/xxx`. + :param path: Actual path + :param context: Fileset context + :return A virtual path + """ + if context.get_storage_type() == StorageType.HDFS: + actual_prefix = infer_storage_options( + context.get_fileset().storage_location() + )["path"] + elif context.get_storage_type() == StorageType.LOCAL: + actual_prefix = context.get_fileset().storage_location()[ + len(f"{StorageType.LOCAL.value}:") : + ] + else: + raise GravitinoRuntimeException( + f"Storage type:{context.get_storage_type()} doesn't support now." + ) + + if not path.startswith(actual_prefix): + raise GravitinoRuntimeException( + f"Path {path} does not start with valid prefix {actual_prefix}." + ) + virtual_location = self._get_virtual_location(context.get_name_identifier()) + return f"{path.replace(actual_prefix, virtual_location)}" + + def _convert_actual_info(self, entry: Dict, context: FilesetContext): + """Convert a file info from an actual entry to a virtual entry. + :param entry: A dict of the actual file info + :param context: Fileset context + :return A dict of the virtual file info + """ + path = self._convert_actual_path(entry["name"], context) + return { + "name": path, + "size": entry["size"], + "type": entry["type"], + "mtime": entry["mtime"], + } + + def _get_fileset_context(self, virtual_path: str): + """Get a fileset context from the cache or the Gravitino server + :param virtual_path: The virtual path + :return A fileset context + """ + virtual_path: str = self._pre_process_path(virtual_path) + identifier: NameIdentifier = self._extract_identifier(virtual_path) + read_lock = self._cache_lock.gen_rlock() + try: + read_lock.acquire() + cache_value: Tuple[Fileset, AbstractFileSystem, StorageType] = ( + self._cache.get(identifier) + ) + if cache_value is not None: + actual_path = self._get_actual_path_by_ident( + identifier, + cache_value[0], + cache_value[1], + cache_value[2], + virtual_path, + ) + return FilesetContext( + identifier, + cache_value[0], + cache_value[1], + cache_value[2], + actual_path, + ) + finally: + read_lock.release() + + write_lock = self._cache_lock.gen_wlock() + try: + write_lock.acquire() + cache_value: Tuple[Fileset, AbstractFileSystem] = self._cache.get( + identifier + ) + if cache_value is not None: + actual_path = self._get_actual_path_by_ident( + identifier, + cache_value[0], + cache_value[1], + cache_value[2], + virtual_path, + ) + return FilesetContext( + identifier, + cache_value[0], + cache_value[1], + cache_value[2], + actual_path, + ) + fileset: Fileset = self._load_fileset_from_server(identifier) + storage_location = fileset.storage_location() + if storage_location.startswith(f"{StorageType.HDFS.value}://"): + fs = ArrowFSWrapper(HadoopFileSystem.from_uri(storage_location)) + storage_type = StorageType.HDFS + elif storage_location.startswith(f"{StorageType.LOCAL.value}:/"): + fs = LocalFileSystem() + storage_type = StorageType.LOCAL + else: + raise GravitinoRuntimeException( + f"Storage under the fileset: `{identifier}` doesn't support now." + ) + actual_path = self._get_actual_path_by_ident( + identifier, fileset, fs, storage_type, virtual_path + ) + self._cache[identifier] = (fileset, fs, storage_type) + context = FilesetContext(identifier, fileset, fs, storage_type, actual_path) + return context + finally: + write_lock.release() + + def _extract_identifier(self, path): + """Extract the fileset identifier from the path. + :param path: The virtual fileset path + :return The fileset identifier + """ + if path is None: + raise GravitinoRuntimeException( + "path which need be extracted cannot be null or empty." + ) + + match = self._identifier_pattern.match(path) + if match and len(match.groups()) == 3: + return NameIdentifier.of_fileset( + self._metalake, match.group(1), match.group(2), match.group(3) + ) + raise GravitinoRuntimeException( + f"path: `{path}` doesn't contains valid identifier." + ) + + def _load_fileset_from_server(self, identifier: NameIdentifier) -> Fileset: + """Load the fileset from the server. + If the fileset is not found on the server, an `NoSuchFilesetException` exception will be raised. + :param identifier: The fileset identifier + :return The fileset + """ + catalog: Catalog = self._client.load_catalog( + NameIdentifier.of_catalog( + identifier.namespace().level(0), identifier.namespace().level(1) + ) + ) + return catalog.as_fileset_catalog().load_fileset(identifier) + + def _get_actual_path_by_ident( + self, + identifier: NameIdentifier, + fileset: Fileset, + fs: AbstractFileSystem, + storage_type: StorageType, + virtual_path: str, + ): + """Get the actual path by the virtual path and the fileset. + :param identifier: The fileset identifier + :param fileset: The fileset + :param fs: The file system corresponding to the fileset storage location + :param storage_type: The storage type of the fileset storage location + :param virtual_path: The virtual fileset path + :return The actual path. + """ + virtual_location = self._get_virtual_location(identifier) + storage_location = fileset.storage_location() + if self._check_mount_single_file(fileset, fs, storage_type): + if virtual_path != virtual_location: + raise GravitinoRuntimeException( + f"Path: {virtual_path} should be same with the virtual location: {virtual_location}" + " when the fileset only mounts a single file." + ) + return storage_location + return virtual_path.replace(virtual_location, storage_location, 1) + + @staticmethod + def _get_virtual_location(identifier: NameIdentifier): + """Get the virtual location of the fileset. + :param identifier: The name identifier of the fileset + :return The virtual location. + """ + return ( + f"fileset/{identifier.namespace().level(1)}" + f"/{identifier.namespace().level(2)}" + f"/{identifier.name()}" + ) + + def _check_mount_single_file( + self, fileset: Fileset, fs: AbstractFileSystem, storage_type: StorageType + ): + """Check if the fileset is mounted a single file. + :param fileset: The fileset + :param fs: The file system corresponding to the fileset storage location + :param storage_type: The storage type of the fileset storage location + :return True the fileset is mounted a single file. + """ + result: Dict = fs.info( + self._strip_storage_protocol(storage_type, fileset.storage_location()) + ) + return result["type"] == "file" + + @staticmethod + def _pre_process_path(virtual_path): + """Pre-process the path. + We will uniformly process `gvfs://fileset/{catalog}/{schema}/{fileset_name}/xxx` + into the format of `fileset/{catalog}/{schema}/{fileset_name}/xxx`. + This is because some implementations of `PyArrow` and `fsspec` can only recognize this format. + :param virtual_path: The virtual path + :return The pre-processed path + """ + if isinstance(virtual_path, PurePosixPath): + pre_processed_path = virtual_path.as_posix() + else: + pre_processed_path = virtual_path + gvfs_prefix = f"{PROTOCOL_NAME}://" + if pre_processed_path.startswith(gvfs_prefix): + pre_processed_path = pre_processed_path[len(gvfs_prefix) :] + if not pre_processed_path.startswith("fileset/"): + raise GravitinoRuntimeException( + f"Invalid path:`{pre_processed_path}`. Expected path to start with `fileset/`." + " Example: fileset/{fileset_catalog}/{schema}/{fileset_name}/{sub_path}." + ) + return pre_processed_path + + @staticmethod + def _strip_storage_protocol(storage_type: StorageType, path: str): + """Strip the storage protocol from the path. + Before passing the path to the underlying file system for processing, + pre-process the protocol information in the path. + Some file systems require special processing. + For HDFS, we can pass the path like 'hdfs://{host}:{port}/xxx'. + For Local, we can pass the path like '/tmp/xxx'. + :param storage_type: The storage type + :param path: The path + :return: The stripped path + """ + if storage_type == StorageType.HDFS: + return path + if storage_type == StorageType.LOCAL: + return path[len(f"{StorageType.LOCAL.value}:") :] + raise GravitinoRuntimeException( + f"Storage type:{storage_type} doesn't support now." + ) + + +fsspec.register_implementation(PROTOCOL_NAME, GravitinoVirtualFileSystem) diff --git a/clients/client-python/requirements-dev.txt b/clients/client-python/requirements-dev.txt index 0d667441daa..47b384622c7 100644 --- a/clients/client-python/requirements-dev.txt +++ b/clients/client-python/requirements-dev.txt @@ -6,3 +6,7 @@ pylint==3.2.2 black==24.4.2 twine==5.1.0 coverage==7.5.1 +pandas==2.0.3 +pyarrow==15.0.2 +llama-index==0.10.40 +tenacity==8.3.0 \ No newline at end of file diff --git a/clients/client-python/requirements.txt b/clients/client-python/requirements.txt index e4e72675b81..2079d34d897 100644 --- a/clients/client-python/requirements.txt +++ b/clients/client-python/requirements.txt @@ -3,4 +3,8 @@ # the tools to publish the python client to Pypi requests -dataclasses-json \ No newline at end of file +dataclasses-json +readerwriterlock==1.0.9 +fsspec==2024.3.1 +pyarrow +cachetools==5.3.3 \ No newline at end of file diff --git a/clients/client-python/tests/unittests/mock_base.py b/clients/client-python/tests/unittests/mock_base.py new file mode 100644 index 00000000000..dd9bc3382fe --- /dev/null +++ b/clients/client-python/tests/unittests/mock_base.py @@ -0,0 +1,84 @@ +""" +Copyright 2024 Datastrato Pvt Ltd. +This software is licensed under the Apache License version 2. +""" + +from gravitino import GravitinoMetalake, Catalog, Fileset +from gravitino.catalog.fileset_catalog import FilesetCatalog +from gravitino.dto.fileset_dto import FilesetDTO +from gravitino.dto.audit_dto import AuditDTO +from gravitino.dto.metalake_dto import MetalakeDTO + +from unittest.mock import patch + + +def mock_load_metalake(): + audit_dto = AuditDTO( + _creator="test", + _create_time="2022-01-01T00:00:00Z", + _last_modifier="test", + _last_modified_time="2024-04-05T10:10:35.218Z", + ) + metalake_dto = MetalakeDTO( + _name="metalake_demo", + _comment="this is test", + _properties={"k": "v"}, + _audit=audit_dto, + ) + return GravitinoMetalake(metalake_dto) + + +def mock_load_fileset_catalog(): + audit_dto = AuditDTO( + _creator="test", + _create_time="2022-01-01T00:00:00Z", + _last_modifier="test", + _last_modified_time="2024-04-05T10:10:35.218Z", + ) + catalog = FilesetCatalog( + name="fileset_catalog", + catalog_type=Catalog.Type.FILESET, + provider="hadoop", + comment="this is test", + properties={"k": "v"}, + audit=audit_dto, + rest_client=None, + ) + return catalog + + +def mock_load_fileset(name: str, location: str): + audit_dto = AuditDTO( + _creator="test", + _create_time="2022-01-01T00:00:00Z", + _last_modifier="test", + _last_modified_time="2024-04-05T10:10:35.218Z", + ) + fileset = FilesetDTO( + _name=name, + _type=Fileset.Type.MANAGED, + _comment="this is test", + _properties={"k": "v"}, + _storage_location=location, + _audit=audit_dto, + ) + return fileset + + +def mock_data(cls): + @patch( + "gravitino.client.gravitino_client_base.GravitinoClientBase.load_metalake", + return_value=mock_load_metalake(), + ) + @patch( + "gravitino.client.gravitino_metalake.GravitinoMetalake.load_catalog", + return_value=mock_load_fileset_catalog(), + ) + @patch( + "gravitino.client.gravitino_client_base.GravitinoClientBase.check_version", + return_value=True, + ) + class Wrapper(cls): + pass + + return Wrapper diff --git a/clients/client-python/tests/unittests/test_gvfs_with_local.py b/clients/client-python/tests/unittests/test_gvfs_with_local.py new file mode 100644 index 00000000000..73f1a2cea9b --- /dev/null +++ b/clients/client-python/tests/unittests/test_gvfs_with_local.py @@ -0,0 +1,987 @@ +""" +Copyright 2024 Datastrato Pvt Ltd. +This software is licensed under the Apache License version 2. +""" + +import random +import string +import time +import unittest +import mock_base +import pandas +import pyarrow as pa +import pyarrow.dataset as dt +import pyarrow.parquet as pq +from unittest.mock import patch + +from gravitino import gvfs +from gravitino import NameIdentifier +from gravitino.dto.audit_dto import AuditDTO +from gravitino.dto.fileset_dto import FilesetDTO +from gravitino.filesystem.gvfs import FilesetContext, StorageType +from gravitino.exceptions.gravitino_runtime_exception import GravitinoRuntimeException +from fsspec.implementations.local import LocalFileSystem +from llama_index.core import SimpleDirectoryReader + + +def generate_unique_random_string(length): + characters = string.ascii_letters + string.digits + random_string = "".join(random.sample(characters, length)) + return random_string + + +@mock_base.mock_data +class TestLocalFilesystem(unittest.TestCase): + _local_base_dir_path: str = "file:/tmp/fileset" + _fileset_dir: str = ( + f"{_local_base_dir_path}/{generate_unique_random_string(10)}/fileset_catalog/tmp" + ) + + def setUp(self) -> None: + local_fs = LocalFileSystem() + if not local_fs.exists(self._fileset_dir): + local_fs.mkdir(self._fileset_dir) + + def tearDown(self) -> None: + local_fs = LocalFileSystem() + if local_fs.exists(self._local_base_dir_path): + local_fs.rm(self._local_base_dir_path, recursive=True) + + @patch( + "gravitino.catalog.fileset_catalog.FilesetCatalog.load_fileset", + return_value=mock_base.mock_load_fileset( + "test_cache", f"{_fileset_dir}/test_cache" + ), + ) + def test_cache(self, mock_method1, mock_method2, mock_method3, mock_method4): + local_fs = LocalFileSystem() + fileset_storage_location = f"{self._fileset_dir}/test_cache" + fileset_virtual_location = "fileset/fileset_catalog/tmp/test_cache" + local_fs.mkdir(fileset_storage_location) + self.assertTrue(local_fs.exists(fileset_storage_location)) + + fs = gvfs.GravitinoVirtualFileSystem( + server_uri="http://localhost:9090", + metalake_name="metalake_demo", + cache_size=1, + cache_expired_time=1, + ) + self.assertTrue(fs.exists(fileset_virtual_location)) + # wait 2 seconds + time.sleep(2) + self.assertIsNone( + fs._cache.get( + NameIdentifier.of_fileset( + "metalake_demo", "fileset_catalog", "tmp", "test_cache" + ) + ) + ) + + @patch( + "gravitino.catalog.fileset_catalog.FilesetCatalog.load_fileset", + return_value=mock_base.mock_load_fileset("test_ls", f"{_fileset_dir}/test_ls"), + ) + def test_ls(self, mock_method1, mock_method2, mock_method3, mock_method4): + local_fs = LocalFileSystem() + fileset_storage_location = f"{self._fileset_dir}/test_ls" + fileset_virtual_location = "fileset/fileset_catalog/tmp/test_ls" + local_fs.mkdir(fileset_storage_location) + sub_dir_path = f"{fileset_storage_location}/test_1" + local_fs.mkdir(sub_dir_path) + self.assertTrue(local_fs.exists(sub_dir_path)) + sub_file_path = f"{fileset_storage_location}/test_file_1.par" + local_fs.touch(sub_file_path) + self.assertTrue(local_fs.exists(sub_file_path)) + + fs = gvfs.GravitinoVirtualFileSystem( + server_uri="http://localhost:9090", metalake_name="metalake_demo" + ) + self.assertTrue(fs.exists(fileset_virtual_location)) + + # test detail = false + file_list_without_detail = fs.ls(fileset_virtual_location, detail=False) + file_list_without_detail.sort() + self.assertEqual(2, len(file_list_without_detail)) + self.assertEqual( + file_list_without_detail[0], f"{fileset_virtual_location}/test_1" + ) + self.assertEqual( + file_list_without_detail[1], f"{fileset_virtual_location}/test_file_1.par" + ) + + # test detail = true + file_list_with_detail = fs.ls(fileset_virtual_location, detail=True) + file_list_with_detail.sort(key=lambda x: x["name"]) + self.assertEqual(2, len(file_list_with_detail)) + self.assertEqual( + file_list_with_detail[0]["name"], f"{fileset_virtual_location}/test_1" + ) + self.assertEqual( + file_list_with_detail[1]["name"], + f"{fileset_virtual_location}/test_file_1.par", + ) + + @patch( + "gravitino.catalog.fileset_catalog.FilesetCatalog.load_fileset", + return_value=mock_base.mock_load_fileset( + "test_info", f"{_fileset_dir}/test_info" + ), + ) + def test_info(self, mock_method1, mock_method2, mock_method3, mock_method4): + local_fs = LocalFileSystem() + fileset_storage_location = f"{self._fileset_dir}/test_info" + fileset_virtual_location = "fileset/fileset_catalog/tmp/test_info" + local_fs.mkdir(fileset_storage_location) + sub_dir_path = f"{fileset_storage_location}/test_1" + local_fs.mkdir(sub_dir_path) + self.assertTrue(local_fs.exists(sub_dir_path)) + sub_file_path = f"{fileset_storage_location}/test_file_1.par" + local_fs.touch(sub_file_path) + self.assertTrue(local_fs.exists(sub_file_path)) + + fs = gvfs.GravitinoVirtualFileSystem( + server_uri="http://localhost:9090", metalake_name="metalake_demo" + ) + self.assertTrue(fs.exists(fileset_virtual_location)) + + dir_virtual_path = fileset_virtual_location + "/test_1" + dir_info = fs.info(dir_virtual_path) + self.assertEqual(dir_info["name"], dir_virtual_path) + + file_virtual_path = fileset_virtual_location + "/test_file_1.par" + file_info = fs.info(file_virtual_path) + self.assertEqual(file_info["name"], file_virtual_path) + + @patch( + "gravitino.catalog.fileset_catalog.FilesetCatalog.load_fileset", + return_value=mock_base.mock_load_fileset( + "test_exist", f"{_fileset_dir}/test_exist" + ), + ) + def test_exist(self, mock_method1, mock_method2, mock_method3, mock_method4): + local_fs = LocalFileSystem() + fileset_storage_location = f"{self._fileset_dir}/test_exist" + fileset_virtual_location = "fileset/fileset_catalog/tmp/test_exist" + local_fs.mkdir(fileset_storage_location) + sub_dir_path = f"{fileset_storage_location}/test_1" + local_fs.mkdir(sub_dir_path) + self.assertTrue(local_fs.exists(sub_dir_path)) + sub_file_path = f"{fileset_storage_location}/test_file_1.par" + local_fs.touch(sub_file_path) + self.assertTrue(local_fs.exists(sub_file_path)) + + fs = gvfs.GravitinoVirtualFileSystem( + server_uri="http://localhost:9090", metalake_name="metalake_demo" + ) + self.assertTrue(fs.exists(fileset_virtual_location)) + + dir_virtual_path = fileset_virtual_location + "/test_1" + self.assertTrue(fs.exists(dir_virtual_path)) + + file_virtual_path = fileset_virtual_location + "/test_file_1.par" + self.assertTrue(fs.exists(file_virtual_path)) + + @patch( + "gravitino.catalog.fileset_catalog.FilesetCatalog.load_fileset", + return_value=mock_base.mock_load_fileset( + "test_cp_file", f"{_fileset_dir}/test_cp_file" + ), + ) + def test_cp_file(self, mock_method1, mock_method2, mock_method3, mock_method4): + local_fs = LocalFileSystem() + fileset_storage_location = f"{self._fileset_dir}/test_cp_file" + local_fs.mkdir(fileset_storage_location) + + fileset_virtual_location = "fileset/fileset_catalog/tmp/test_cp_file" + sub_file_path = f"{fileset_storage_location}/test_file_1.par" + local_fs.touch(sub_file_path) + self.assertTrue(local_fs.exists(sub_file_path)) + + with local_fs.open(sub_file_path, "wb") as f: + f.write(b"test_file_1") + + fs = gvfs.GravitinoVirtualFileSystem( + server_uri="http://localhost:9090", metalake_name="metalake_demo" + ) + self.assertTrue(fs.exists(fileset_virtual_location)) + + file_virtual_path = fileset_virtual_location + "/test_file_1.par" + self.assertTrue(fs.exists(file_virtual_path)) + + cp_file_virtual_path = fileset_virtual_location + "/test_cp_file_1.par" + fs.cp_file(file_virtual_path, cp_file_virtual_path) + self.assertTrue(fs.exists(cp_file_virtual_path)) + with local_fs.open(sub_file_path, "rb") as f: + result = f.read() + self.assertEqual(b"test_file_1", result) + + # test invalid dst path + cp_file_invalid_virtual_path = ( + "fileset/fileset_catalog/tmp/invalid_fileset/test_cp_file_1.par" + ) + with self.assertRaises(GravitinoRuntimeException): + fs.cp_file(file_virtual_path, cp_file_invalid_virtual_path) + + # test mount a single file + local_fs.rm(path=fileset_storage_location, recursive=True) + self.assertFalse(local_fs.exists(fileset_storage_location)) + local_fs.touch(fileset_storage_location) + self.assertTrue(local_fs.exists(fileset_storage_location)) + with self.assertRaises(GravitinoRuntimeException): + fs.cp_file(file_virtual_path, cp_file_virtual_path) + + @patch( + "gravitino.catalog.fileset_catalog.FilesetCatalog.load_fileset", + return_value=mock_base.mock_load_fileset("test_mv", f"{_fileset_dir}/test_mv"), + ) + def test_mv(self, mock_method1, mock_method2, mock_method3, mock_method4): + local_fs = LocalFileSystem() + fileset_storage_location = f"{self._fileset_dir}/test_mv" + local_fs.mkdir(fileset_storage_location) + + fileset_virtual_location = "fileset/fileset_catalog/tmp/test_mv" + + sub_file_path = f"{fileset_storage_location}/test_file_1.par" + local_fs.touch(sub_file_path) + self.assertTrue(local_fs.exists(sub_file_path)) + + another_dir_path = f"{fileset_storage_location}/another_dir" + local_fs.mkdirs(another_dir_path) + self.assertTrue(local_fs.exists(another_dir_path)) + + fs = gvfs.GravitinoVirtualFileSystem( + server_uri="http://localhost:9090", metalake_name="metalake_demo" + ) + self.assertTrue(fs.exists(fileset_virtual_location)) + + file_virtual_path = fileset_virtual_location + "/test_file_1.par" + self.assertTrue(fs.exists(file_virtual_path)) + + mv_file_virtual_path = fileset_virtual_location + "/test_cp_file_1.par" + fs.mv(file_virtual_path, mv_file_virtual_path) + self.assertTrue(fs.exists(mv_file_virtual_path)) + + mv_another_dir_virtual_path = ( + fileset_virtual_location + "/another_dir/test_file_2.par" + ) + fs.mv(mv_file_virtual_path, mv_another_dir_virtual_path) + self.assertTrue(fs.exists(mv_another_dir_virtual_path)) + + # test not exist dir + not_exist_dst_dir_path = fileset_virtual_location + "/not_exist/test_file_2.par" + with self.assertRaises(FileNotFoundError): + fs.mv(path1=mv_another_dir_virtual_path, path2=not_exist_dst_dir_path) + + # test invalid dst path + mv_file_invalid_virtual_path = ( + "fileset/fileset_catalog/tmp/invalid_fileset/test_cp_file_1.par" + ) + with self.assertRaises(GravitinoRuntimeException): + fs.mv(path1=file_virtual_path, path2=mv_file_invalid_virtual_path) + + # test mount a single file + local_fs.rm(path=fileset_storage_location, recursive=True) + self.assertFalse(local_fs.exists(fileset_storage_location)) + local_fs.touch(fileset_storage_location) + self.assertTrue(local_fs.exists(fileset_storage_location)) + with self.assertRaises(GravitinoRuntimeException): + fs.mv(file_virtual_path, mv_file_virtual_path) + + @patch( + "gravitino.catalog.fileset_catalog.FilesetCatalog.load_fileset", + return_value=mock_base.mock_load_fileset("test_rm", f"{_fileset_dir}/test_rm"), + ) + def test_rm(self, mock_method1, mock_method2, mock_method3, mock_method4): + local_fs = LocalFileSystem() + fileset_storage_location = f"{self._fileset_dir}/test_rm" + local_fs.mkdir(fileset_storage_location) + + fileset_virtual_location = "fileset/fileset_catalog/tmp/test_rm" + + sub_file_path = f"{fileset_storage_location}/test_file_1.par" + local_fs.touch(sub_file_path) + self.assertTrue(local_fs.exists(sub_file_path)) + + sub_dir_path = f"{fileset_storage_location}/sub_dir" + local_fs.mkdirs(sub_dir_path) + self.assertTrue(local_fs.exists(sub_dir_path)) + + fs = gvfs.GravitinoVirtualFileSystem( + server_uri="http://localhost:9090", metalake_name="metalake_demo" + ) + self.assertTrue(fs.exists(fileset_virtual_location)) + + # test delete file + file_virtual_path = fileset_virtual_location + "/test_file_1.par" + self.assertTrue(fs.exists(file_virtual_path)) + fs.rm(file_virtual_path) + self.assertFalse(fs.exists(file_virtual_path)) + + # test delete dir with recursive = false + dir_virtual_path = fileset_virtual_location + "/sub_dir" + self.assertTrue(fs.exists(dir_virtual_path)) + with self.assertRaises(ValueError): + fs.rm(dir_virtual_path, recursive=False) + + # test delete dir with recursive = true + fs.rm(dir_virtual_path, recursive=True) + self.assertFalse(fs.exists(dir_virtual_path)) + + @patch( + "gravitino.catalog.fileset_catalog.FilesetCatalog.load_fileset", + return_value=mock_base.mock_load_fileset( + "test_rm_file", f"{_fileset_dir}/test_rm_file" + ), + ) + def test_rm_file(self, mock_method1, mock_method2, mock_method3, mock_method4): + local_fs = LocalFileSystem() + fileset_storage_location = f"{self._fileset_dir}/test_rm_file" + local_fs.mkdir(fileset_storage_location) + + fileset_virtual_location = "fileset/fileset_catalog/tmp/test_rm_file" + + sub_file_path = f"{fileset_storage_location}/test_file_1.par" + local_fs.touch(sub_file_path) + self.assertTrue(local_fs.exists(sub_file_path)) + + sub_dir_path = f"{fileset_storage_location}/sub_dir" + local_fs.mkdirs(sub_dir_path) + self.assertTrue(local_fs.exists(sub_dir_path)) + + fs = gvfs.GravitinoVirtualFileSystem( + server_uri="http://localhost:9090", metalake_name="metalake_demo" + ) + self.assertTrue(fs.exists(fileset_virtual_location)) + + # test delete file + file_virtual_path = fileset_virtual_location + "/test_file_1.par" + self.assertTrue(fs.exists(file_virtual_path)) + fs.rm_file(file_virtual_path) + self.assertFalse(fs.exists(file_virtual_path)) + + # test delete dir + dir_virtual_path = fileset_virtual_location + "/sub_dir" + self.assertTrue(fs.exists(dir_virtual_path)) + with self.assertRaises(IsADirectoryError): + fs.rm_file(dir_virtual_path) + + @patch( + "gravitino.catalog.fileset_catalog.FilesetCatalog.load_fileset", + return_value=mock_base.mock_load_fileset( + "test_rmdir", f"{_fileset_dir}/test_rmdir" + ), + ) + def test_rmdir(self, mock_method1, mock_method2, mock_method3, mock_method4): + local_fs = LocalFileSystem() + fileset_storage_location = f"{self._fileset_dir}/test_rmdir" + local_fs.mkdir(fileset_storage_location) + + fileset_virtual_location = "fileset/fileset_catalog/tmp/test_rmdir" + + sub_file_path = f"{fileset_storage_location}/test_file_1.par" + local_fs.touch(sub_file_path) + self.assertTrue(local_fs.exists(sub_file_path)) + + sub_dir_path = f"{fileset_storage_location}/sub_dir" + local_fs.mkdirs(sub_dir_path) + self.assertTrue(local_fs.exists(sub_dir_path)) + + fs = gvfs.GravitinoVirtualFileSystem( + server_uri="http://localhost:9090", metalake_name="metalake_demo" + ) + self.assertTrue(fs.exists(fileset_virtual_location)) + + # test delete file + file_virtual_path = fileset_virtual_location + "/test_file_1.par" + self.assertTrue(fs.exists(file_virtual_path)) + with self.assertRaises(NotADirectoryError): + fs.rmdir(file_virtual_path) + + # test delete dir + dir_virtual_path = fileset_virtual_location + "/sub_dir" + self.assertTrue(fs.exists(dir_virtual_path)) + fs.rmdir(dir_virtual_path) + self.assertFalse(fs.exists(dir_virtual_path)) + + @patch( + "gravitino.catalog.fileset_catalog.FilesetCatalog.load_fileset", + return_value=mock_base.mock_load_fileset( + "test_open", f"{_fileset_dir}/test_open" + ), + ) + def test_open(self, mock_method1, mock_method2, mock_method3, mock_method4): + local_fs = LocalFileSystem() + fileset_storage_location = f"{self._fileset_dir}/test_open" + local_fs.mkdir(fileset_storage_location) + + fileset_virtual_location = "fileset/fileset_catalog/tmp/test_open" + + sub_file_path = f"{fileset_storage_location}/test_file_1.par" + local_fs.touch(sub_file_path) + self.assertTrue(local_fs.exists(sub_file_path)) + + sub_dir_path = f"{fileset_storage_location}/sub_dir" + local_fs.mkdirs(sub_dir_path) + self.assertTrue(local_fs.exists(sub_dir_path)) + + fs = gvfs.GravitinoVirtualFileSystem( + server_uri="http://localhost:9090", metalake_name="metalake_demo" + ) + self.assertTrue(fs.exists(fileset_virtual_location)) + + # test open and write file + file_virtual_path = fileset_virtual_location + "/test_file_1.par" + self.assertTrue(fs.exists(file_virtual_path)) + with fs.open(file_virtual_path, mode="wb") as f: + f.write(b"test_open_write") + self.assertTrue(fs.info(file_virtual_path)["size"] > 0) + + # test open and read file + with fs.open(file_virtual_path, mode="rb") as f: + self.assertEqual(b"test_open_write", f.read()) + + # test open dir + dir_virtual_path = fileset_virtual_location + "/sub_dir" + self.assertTrue(fs.exists(dir_virtual_path)) + with self.assertRaises(IsADirectoryError): + fs.open(dir_virtual_path) + + @patch( + "gravitino.catalog.fileset_catalog.FilesetCatalog.load_fileset", + return_value=mock_base.mock_load_fileset( + "test_mkdir", f"{_fileset_dir}/test_mkdir" + ), + ) + def test_mkdir(self, mock_method1, mock_method2, mock_method3, mock_method4): + local_fs = LocalFileSystem() + fileset_storage_location = f"{self._fileset_dir}/test_mkdir" + local_fs.mkdir(fileset_storage_location) + + fileset_virtual_location = "fileset/fileset_catalog/tmp/test_mkdir" + + sub_dir_path = f"{fileset_storage_location}/sub_dir" + local_fs.mkdirs(sub_dir_path) + self.assertTrue(local_fs.exists(sub_dir_path)) + + fs = gvfs.GravitinoVirtualFileSystem( + server_uri="http://localhost:9090", metalake_name="metalake_demo" + ) + self.assertTrue(fs.exists(fileset_virtual_location)) + + # test mkdir dir which exists + existed_dir_virtual_path = fileset_virtual_location + self.assertTrue(fs.exists(existed_dir_virtual_path)) + with self.assertRaises(FileExistsError): + fs.mkdir(existed_dir_virtual_path) + + # test mkdir dir with create_parents = false + parent_not_exist_virtual_path = fileset_virtual_location + "/not_exist/sub_dir" + self.assertFalse(fs.exists(parent_not_exist_virtual_path)) + with self.assertRaises(FileNotFoundError): + fs.mkdir(parent_not_exist_virtual_path, create_parents=False) + + # test mkdir dir with create_parents = true + parent_not_exist_virtual_path2 = fileset_virtual_location + "/not_exist/sub_dir" + self.assertFalse(fs.exists(parent_not_exist_virtual_path2)) + fs.mkdir(parent_not_exist_virtual_path2, create_parents=True) + self.assertTrue(fs.exists(parent_not_exist_virtual_path2)) + + @patch( + "gravitino.catalog.fileset_catalog.FilesetCatalog.load_fileset", + return_value=mock_base.mock_load_fileset( + "test_makedirs", f"{_fileset_dir}/test_makedirs" + ), + ) + def test_makedirs(self, mock_method1, mock_method2, mock_method3, mock_method4): + local_fs = LocalFileSystem() + fileset_storage_location = f"{self._fileset_dir}/test_makedirs" + local_fs.mkdir(fileset_storage_location) + + fileset_virtual_location = "fileset/fileset_catalog/tmp/test_makedirs" + + sub_dir_path = f"{fileset_storage_location}/sub_dir" + local_fs.mkdirs(sub_dir_path) + self.assertTrue(local_fs.exists(sub_dir_path)) + + fs = gvfs.GravitinoVirtualFileSystem( + server_uri="http://localhost:9090", metalake_name="metalake_demo" + ) + self.assertTrue(fs.exists(fileset_virtual_location)) + + # test mkdir dir which exists + existed_dir_virtual_path = fileset_virtual_location + self.assertTrue(fs.exists(existed_dir_virtual_path)) + with self.assertRaises(FileExistsError): + fs.mkdirs(existed_dir_virtual_path) + + # test mkdir dir not exist + parent_not_exist_virtual_path = fileset_virtual_location + "/not_exist/sub_dir" + self.assertFalse(fs.exists(parent_not_exist_virtual_path)) + fs.makedirs(parent_not_exist_virtual_path) + self.assertTrue(fs.exists(parent_not_exist_virtual_path)) + + @patch( + "gravitino.catalog.fileset_catalog.FilesetCatalog.load_fileset", + return_value=mock_base.mock_load_fileset( + "test_created", f"{_fileset_dir}/test_created" + ), + ) + def test_created(self, mock_method1, mock_method2, mock_method3, mock_method4): + local_fs = LocalFileSystem() + fileset_storage_location = f"{self._fileset_dir}/test_created" + local_fs.mkdir(fileset_storage_location) + + fileset_virtual_location = "fileset/fileset_catalog/tmp/test_created" + + sub_dir_path = f"{fileset_storage_location}/sub_dir" + local_fs.mkdirs(sub_dir_path) + self.assertTrue(local_fs.exists(sub_dir_path)) + + fs = gvfs.GravitinoVirtualFileSystem( + server_uri="http://localhost:9090", metalake_name="metalake_demo" + ) + self.assertTrue(fs.exists(fileset_virtual_location)) + + # test mkdir dir which exists + dir_virtual_path = fileset_virtual_location + "/sub_dir" + self.assertTrue(fs.exists(dir_virtual_path)) + self.assertIsNotNone(fs.created(dir_virtual_path)) + + @patch( + "gravitino.catalog.fileset_catalog.FilesetCatalog.load_fileset", + return_value=mock_base.mock_load_fileset( + "test_modified", f"{_fileset_dir}/test_modified" + ), + ) + def test_modified(self, mock_method1, mock_method2, mock_method3, mock_method4): + local_fs = LocalFileSystem() + fileset_storage_location = f"{self._fileset_dir}/test_modified" + local_fs.mkdir(fileset_storage_location) + + fileset_virtual_location = "fileset/fileset_catalog/tmp/test_modified" + + sub_dir_path = f"{fileset_storage_location}/sub_dir" + local_fs.mkdirs(sub_dir_path) + self.assertTrue(local_fs.exists(sub_dir_path)) + + fs = gvfs.GravitinoVirtualFileSystem( + server_uri="http://localhost:9090", metalake_name="metalake_demo" + ) + self.assertTrue(fs.exists(fileset_virtual_location)) + + # test mkdir dir which exists + dir_virtual_path = fileset_virtual_location + "/sub_dir" + self.assertTrue(fs.exists(dir_virtual_path)) + self.assertIsNotNone(fs.modified(dir_virtual_path)) + + @patch( + "gravitino.catalog.fileset_catalog.FilesetCatalog.load_fileset", + return_value=mock_base.mock_load_fileset( + "test_cat_file", f"{_fileset_dir}/test_cat_file" + ), + ) + def test_cat_file(self, mock_method1, mock_method2, mock_method3, mock_method4): + local_fs = LocalFileSystem() + fileset_storage_location = f"{self._fileset_dir}/test_cat_file" + local_fs.mkdir(fileset_storage_location) + + fileset_virtual_location = "fileset/fileset_catalog/tmp/test_cat_file" + + sub_file_path = f"{fileset_storage_location}/test_file_1.par" + local_fs.touch(sub_file_path) + self.assertTrue(local_fs.exists(sub_file_path)) + + sub_dir_path = f"{fileset_storage_location}/sub_dir" + local_fs.mkdirs(sub_dir_path) + self.assertTrue(local_fs.exists(sub_dir_path)) + + fs = gvfs.GravitinoVirtualFileSystem( + server_uri="http://localhost:9090", metalake_name="metalake_demo" + ) + self.assertTrue(fs.exists(fileset_virtual_location)) + + # test open and write file + file_virtual_path = fileset_virtual_location + "/test_file_1.par" + self.assertTrue(fs.exists(file_virtual_path)) + with fs.open(file_virtual_path, mode="wb") as f: + f.write(b"test_cat_file") + self.assertTrue(fs.info(file_virtual_path)["size"] > 0) + + # test cat file + content = fs.cat_file(file_virtual_path) + self.assertEqual(b"test_cat_file", content) + + # test cat dir + dir_virtual_path = fileset_virtual_location + "/sub_dir" + self.assertTrue(fs.exists(dir_virtual_path)) + with self.assertRaises(IsADirectoryError): + fs.cat_file(dir_virtual_path) + + @patch( + "gravitino.catalog.fileset_catalog.FilesetCatalog.load_fileset", + return_value=mock_base.mock_load_fileset( + "test_get_file", f"{_fileset_dir}/test_get_file" + ), + ) + def test_get_file(self, mock_method1, mock_method2, mock_method3, mock_method4): + local_fs = LocalFileSystem() + fileset_storage_location = f"{self._fileset_dir}/test_get_file" + local_fs.mkdir(fileset_storage_location) + + fileset_virtual_location = "fileset/fileset_catalog/tmp/test_get_file" + + sub_file_path = f"{fileset_storage_location}/test_file_1.par" + local_fs.touch(sub_file_path) + self.assertTrue(local_fs.exists(sub_file_path)) + + sub_dir_path = f"{fileset_storage_location}/sub_dir" + local_fs.mkdirs(sub_dir_path) + self.assertTrue(local_fs.exists(sub_dir_path)) + + fs = gvfs.GravitinoVirtualFileSystem( + server_uri="http://localhost:9090", metalake_name="metalake_demo" + ) + self.assertTrue(fs.exists(fileset_virtual_location)) + + # test open and write file + file_virtual_path = fileset_virtual_location + "/test_file_1.par" + self.assertTrue(fs.exists(file_virtual_path)) + with fs.open(file_virtual_path, mode="wb") as f: + f.write(b"test_get_file") + self.assertTrue(fs.info(file_virtual_path)["size"] > 0) + + # test get file + local_path = self._fileset_dir + "/local_file.par" + local_fs.touch(local_path) + self.assertTrue(local_fs.exists(local_path)) + fs.get_file(file_virtual_path, local_path) + self.assertEqual(b"test_get_file", local_fs.cat_file(local_path)) + + # test get a dir + dir_virtual_path = fileset_virtual_location + "/sub_dir" + local_path = self._fileset_dir + "/local_dir" + self.assertTrue(fs.exists(dir_virtual_path)) + fs.get_file(dir_virtual_path, local_path) + self.assertTrue(local_fs.exists(local_path)) + + # test get a file to a remote file + remote_path = "gvfs://" + fileset_virtual_location + "/test_file_2.par" + with self.assertRaises(GravitinoRuntimeException): + fs.get_file(file_virtual_path, remote_path) + + def test_convert_actual_path(self, mock_method1, mock_method2, mock_method3): + # test convert actual hdfs path + audit_dto = AuditDTO( + _creator="test", + _create_time="2022-01-01T00:00:00Z", + _last_modifier="test", + _last_modified_time="2024-04-05T10:10:35.218Z", + ) + hdfs_fileset: FilesetDTO = FilesetDTO( + _name="test_f1", + _comment="", + _type=FilesetDTO.Type.MANAGED, + _storage_location="hdfs://localhost:8090/fileset/test_f1", + _audit=audit_dto, + _properties={}, + ) + mock_hdfs_context: FilesetContext = FilesetContext( + name_identifier=NameIdentifier.of_fileset( + "test_metalake", "test_catalog", "test_schema", "test_f1" + ), + storage_type=StorageType.HDFS, + fileset=hdfs_fileset, + actual_path=hdfs_fileset.storage_location() + "/actual_path", + fs=LocalFileSystem(), + ) + + fs = gvfs.GravitinoVirtualFileSystem( + server_uri="http://localhost:9090", metalake_name="metalake_demo" + ) + # test actual path not start with storage location + actual_path = "/not_start_with_storage/ttt" + with self.assertRaises(GravitinoRuntimeException): + fs._convert_actual_path(actual_path, mock_hdfs_context) + + # test actual path start with storage location + actual_path = "/fileset/test_f1/actual_path" + virtual_path = fs._convert_actual_path(actual_path, mock_hdfs_context) + self.assertEqual( + "fileset/test_catalog/test_schema/test_f1/actual_path", virtual_path + ) + + # test convert actual local path + audit_dto = AuditDTO( + _creator="test", + _create_time="2022-01-01T00:00:00Z", + _last_modifier="test", + _last_modified_time="2024-04-05T10:10:35.218Z", + ) + local_fileset: FilesetDTO = FilesetDTO( + _name="test_f1", + _comment="", + _type=FilesetDTO.Type.MANAGED, + _storage_location="file:/tmp/fileset/test_f1", + _audit=audit_dto, + _properties={}, + ) + mock_local_context: FilesetContext = FilesetContext( + name_identifier=NameIdentifier.of_fileset( + "test_metalake", "test_catalog", "test_schema", "test_f1" + ), + storage_type=StorageType.LOCAL, + fileset=local_fileset, + actual_path=local_fileset.storage_location() + "/actual_path", + fs=LocalFileSystem(), + ) + + fs = gvfs.GravitinoVirtualFileSystem( + server_uri="http://localhost:9090", metalake_name="metalake_demo" + ) + # test actual path not start with storage location + actual_path = "/not_start_with_storage/ttt" + with self.assertRaises(GravitinoRuntimeException): + fs._convert_actual_path(actual_path, mock_local_context) + + # test actual path start with storage location + actual_path = "/tmp/fileset/test_f1/actual_path" + virtual_path = fs._convert_actual_path(actual_path, mock_local_context) + self.assertEqual( + "fileset/test_catalog/test_schema/test_f1/actual_path", virtual_path + ) + + def test_convert_info(self, mock_method1, mock_method2, mock_method3): + # test convert actual hdfs path + audit_dto = AuditDTO( + _creator="test", + _create_time="2022-01-01T00:00:00Z", + _last_modifier="test", + _last_modified_time="2024-04-05T10:10:35.218Z", + ) + hdfs_fileset: FilesetDTO = FilesetDTO( + _name="test_f1", + _comment="", + _type=FilesetDTO.Type.MANAGED, + _storage_location="hdfs://localhost:8090/fileset/test_f1", + _audit=audit_dto, + _properties={}, + ) + mock_hdfs_context: FilesetContext = FilesetContext( + name_identifier=NameIdentifier.of_fileset( + "test_metalake", "test_catalog", "test_schema", "test_f1" + ), + storage_type=StorageType.HDFS, + fileset=hdfs_fileset, + actual_path=hdfs_fileset.storage_location() + "/actual_path", + fs=LocalFileSystem(), + ) + + fs = gvfs.GravitinoVirtualFileSystem( + server_uri="http://localhost:9090", metalake_name="metalake_demo" + ) + # test actual path not start with storage location + actual_path = "/not_start_with_storage/ttt" + with self.assertRaises(GravitinoRuntimeException): + fs._convert_actual_path(actual_path, mock_hdfs_context) + + # test actual path start with storage location + actual_path = "/fileset/test_f1/actual_path" + virtual_path = fs._convert_actual_path(actual_path, mock_hdfs_context) + self.assertEqual( + "fileset/test_catalog/test_schema/test_f1/actual_path", virtual_path + ) + + # test convert actual local path + audit_dto = AuditDTO( + _creator="test", + _create_time="2022-01-01T00:00:00Z", + _last_modifier="test", + _last_modified_time="2024-04-05T10:10:35.218Z", + ) + local_fileset: FilesetDTO = FilesetDTO( + _name="test_f1", + _comment="", + _type=FilesetDTO.Type.MANAGED, + _storage_location="file:/tmp/fileset/test_f1", + _audit=audit_dto, + _properties={}, + ) + mock_local_context: FilesetContext = FilesetContext( + name_identifier=NameIdentifier.of_fileset( + "test_metalake", "test_catalog", "test_schema", "test_f1" + ), + storage_type=StorageType.LOCAL, + fileset=local_fileset, + actual_path=local_fileset.storage_location() + "/actual_path", + fs=LocalFileSystem(), + ) + + fs = gvfs.GravitinoVirtualFileSystem( + server_uri="http://localhost:9090", metalake_name="metalake_demo" + ) + # test actual path not start with storage location + actual_path = "/not_start_with_storage/ttt" + with self.assertRaises(GravitinoRuntimeException): + fs._convert_actual_path(actual_path, mock_local_context) + + # test actual path start with storage location + actual_path = "/tmp/fileset/test_f1/actual_path" + virtual_path = fs._convert_actual_path(actual_path, mock_local_context) + self.assertEqual( + "fileset/test_catalog/test_schema/test_f1/actual_path", virtual_path + ) + + def test_extract_identifier(self, mock_method1, mock_method2, mock_method3): + fs = gvfs.GravitinoVirtualFileSystem( + server_uri="http://localhost:9090", metalake_name="metalake_demo" + ) + with self.assertRaises(GravitinoRuntimeException): + fs._extract_identifier(path=None) + + invalid_path = "s3://bucket_1/test_catalog/schema/fileset/ttt" + with self.assertRaises(GravitinoRuntimeException): + fs._extract_identifier(path=invalid_path) + + valid_path = "fileset/test_catalog/schema/fileset/ttt" + identifier: NameIdentifier = fs._extract_identifier(path=valid_path) + self.assertEqual("metalake_demo", identifier.namespace().level(0)) + self.assertEqual("test_catalog", identifier.namespace().level(1)) + self.assertEqual("schema", identifier.namespace().level(2)) + self.assertEqual("fileset", identifier.name()) + + @patch( + "gravitino.catalog.fileset_catalog.FilesetCatalog.load_fileset", + return_value=mock_base.mock_load_fileset( + "test_pandas", f"{_fileset_dir}/test_pandas" + ), + ) + def test_pandas(self, mock_method1, mock_method2, mock_method3, mock_method4): + local_fs = LocalFileSystem() + fileset_storage_location = f"{self._fileset_dir}/test_pandas" + local_fs.mkdir(fileset_storage_location) + + fileset_virtual_location = "gvfs://fileset/fileset_catalog/tmp/test_pandas" + data = pandas.DataFrame({"Name": ["A", "B", "C", "D"], "ID": [20, 21, 19, 18]}) + fs = gvfs.GravitinoVirtualFileSystem( + server_uri="http://localhost:8090", metalake_name="test_metalake" + ) + # to parquet + data.to_parquet(fileset_virtual_location + "/test.parquet", filesystem=fs) + self.assertTrue(local_fs.exists(fileset_storage_location + "/test.parquet")) + + # read parquet + ds1 = pandas.read_parquet( + path=fileset_virtual_location + "/test.parquet", filesystem=fs + ) + self.assertTrue(data.equals(ds1)) + storage_options = { + "server_uri": "http://localhost:8090", + "metalake_name": "test_metalake", + } + # to csv + data.to_csv( + fileset_virtual_location + "/test.csv", + index=False, + storage_options=storage_options, + ) + self.assertTrue(local_fs.exists(fileset_storage_location + "/test.csv")) + + # read csv + ds2 = pandas.read_csv( + fileset_virtual_location + "/test.csv", storage_options=storage_options + ) + self.assertTrue(data.equals(ds2)) + + @patch( + "gravitino.catalog.fileset_catalog.FilesetCatalog.load_fileset", + return_value=mock_base.mock_load_fileset( + "test_pyarrow", f"{_fileset_dir}/test_pyarrow" + ), + ) + def test_pyarrow(self, mock_method1, mock_method2, mock_method3, mock_method4): + local_fs = LocalFileSystem() + fileset_storage_location = f"{self._fileset_dir}/test_pyarrow" + local_fs.mkdir(fileset_storage_location) + + fileset_virtual_location = "gvfs://fileset/fileset_catalog/tmp/test_pyarrow" + data = pandas.DataFrame({"Name": ["A", "B", "C", "D"], "ID": [20, 21, 19, 18]}) + fs = gvfs.GravitinoVirtualFileSystem( + server_uri="http://localhost:8090", metalake_name="test_metalake" + ) + + # to parquet + data.to_parquet(fileset_virtual_location + "/test.parquet", filesystem=fs) + self.assertTrue(local_fs.exists(fileset_storage_location + "/test.parquet")) + + # read as arrow dataset + arrow_dataset = dt.dataset( + fileset_virtual_location + "/test.parquet", filesystem=fs + ) + arrow_tb_1 = arrow_dataset.to_table() + + arrow_tb_2 = pa.Table.from_pandas(data) + self.assertTrue(arrow_tb_1.equals(arrow_tb_2)) + + # read as arrow parquet dataset + arrow_tb_3 = pq.read_table( + fileset_virtual_location + "/test.parquet", filesystem=fs + ) + self.assertTrue(arrow_tb_3.equals(arrow_tb_2)) + + @patch( + "gravitino.catalog.fileset_catalog.FilesetCatalog.load_fileset", + return_value=mock_base.mock_load_fileset( + "test_llama_index", f"{_fileset_dir}/test_llama_index" + ), + ) + def test_llama_index(self, mock_method1, mock_method2, mock_method3, mock_method4): + local_fs = LocalFileSystem() + fileset_storage_location = f"{self._fileset_dir}/test_llama_index" + local_fs.mkdir(fileset_storage_location) + + fileset_virtual_location = "gvfs://fileset/fileset_catalog/tmp/test_llama_index" + data = pandas.DataFrame({"Name": ["A", "B", "C", "D"], "ID": [20, 21, 19, 18]}) + fs = gvfs.GravitinoVirtualFileSystem( + server_uri="http://localhost:8090", metalake_name="test_metalake" + ) + + storage_options = { + "server_uri": "http://localhost:8090", + "metalake_name": "test_metalake", + } + # to csv + data.to_csv( + fileset_virtual_location + "/test.csv", + index=False, + storage_options=storage_options, + ) + self.assertTrue(local_fs.exists(fileset_storage_location + "/test.csv")) + + data.to_csv( + fileset_virtual_location + "/sub_dir/test1.csv", + index=False, + storage_options=storage_options, + ) + self.assertTrue( + local_fs.exists(fileset_storage_location + "/sub_dir/test1.csv") + ) + + reader = SimpleDirectoryReader( + input_dir="fileset/fileset_catalog/tmp/test_llama_index", + fs=fs, + recursive=True, # recursively searches all subdirectories + ) + documents = reader.load_data() + self.assertEqual(len(documents), 2) + doc_1 = documents[0] + result_1 = [line.strip().split(", ") for line in doc_1.text.split("\n")] + self.assertEqual(4, len(result_1)) + for row in result_1: + if row[0] == "A": + self.assertEqual(row[1], "20") + elif row[0] == "B": + self.assertEqual(row[1], "21") + elif row[0] == "C": + self.assertEqual(row[1], "19") + elif row[0] == "D": + self.assertEqual(row[1], "18")