Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#2059] feat(client-python): Support Gravitino Virtual FileSystem in Python #3528

Merged
merged 21 commits into from
Jun 18, 2024
Prev Previous commit
Next Next commit
add uts
  • Loading branch information
xiaojiebao committed Jun 12, 2024
commit 77fb7921564146f31853fe5cbcc3514599f1fc07
198 changes: 161 additions & 37 deletions clients/client-python/gravitino/filesystem/gvfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ class StorageType(Enum):


class FilesetContext:
"""A context object that holds the information about the fileset and the file system which used in
the GravitinoVirtualFileSystem's operations.
"""

def __init__(
self,
name_identifier: NameIdentifier,
Expand Down Expand Up @@ -60,6 +64,14 @@ def get_storage_type(self):


class GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
"""This is a virtual file system which users can access `fileset` and
other resources.

It obtains the actual storage location corresponding to the resource from the
Gravitino server, and creates an independent file system for it to act as an agent for users to
access the underlying storage.
"""

protocol = PROTOCOL_NAME
_identifier_pattern = re.compile("^fileset/([^/]+)/([^/]+)/([^/]+)(?:/[^/]+)*/?$")

Expand All @@ -79,6 +91,12 @@ def sign(self, path, expiration=None, **kwargs):
raise RuntimeError("Unsupported method now.")

def ls(self, path, detail=True, **kwargs):
"""List the files and directories info of the path.
:param path: Virtual fileset path
:param detail: Whether to show the details for the files and directories info
:param kwargs: Extra args
xloya marked this conversation as resolved.
Show resolved Hide resolved
:return If details is true, returns a list of file info dicts, else returns a list of file paths
"""
context: FilesetContext = self._get_fileset_context(path)
if detail:
entries = [
Expand All @@ -92,22 +110,32 @@ def ls(self, path, detail=True, **kwargs):
]
return entries
entries = [
self._actual_path_to_virtual_path(entry_path, context)
self._convert_actual_path(entry_path, context)
for entry_path in context.fs.ls(
self._strip_storage_protocol(context.storage_type, context.actual_path),
detail=True,
detail=False,
)
]
return entries

def info(self, path, **kwargs):
"""Get file info.
:param path: Virtual fileset path
:param kwargs: Extra args
:return A file info dict
"""
context: FilesetContext = self._get_fileset_context(path)
actual_info: Dict = context.fs.info(
self._strip_storage_protocol(context.storage_type, context.actual_path)
)
return self._convert_info(actual_info, context)

def exists(self, path, **kwargs):
"""Check if a file or a directory exists.
:param path: Virtual fileset path
:param kwargs: Extra args
:return A file or directory exists, a boolean
xloya marked this conversation as resolved.
Show resolved Hide resolved
"""
context: FilesetContext = self._get_fileset_context(path)
try:
context.fs.info(
Expand All @@ -118,6 +146,11 @@ def exists(self, path, **kwargs):
return True

def cp_file(self, path1, path2, **kwargs):
"""Copy a file.
:param path1: Virtual src fileset path
:param path2: Virtual dst fileset path, should be consistent with the src path fileset identifier
:param kwargs: Extra args
"""
path1 = self._pre_process_path(path1)
path2 = self._pre_process_path(path2)
xloya marked this conversation as resolved.
Show resolved Hide resolved
src_identifier: NameIdentifier = self._extract_identifier(path1)
Expand Down Expand Up @@ -147,21 +180,12 @@ def cp_file(self, path1, path2, **kwargs):

# pylint: disable=W0221
xloya marked this conversation as resolved.
Show resolved Hide resolved
def mv(self, path1, path2, **kwargs):
"""
1. Supports move a file which src and dst under the same directory.
fs.mv(path1='gvfs://fileset/fileset_catalog/tmp/tmp_fileset/ttt/test2.txt',
path2='gvfs://fileset/fileset_catalog/tmp/tmp_fileset/ttt/test3.txt')
2. Supports move a directory which src and dst under the same directory.
fs.mv(path1='gvfs://fileset/fileset_catalog/tmp/tmp_fileset/ttt/qqq',
path2='gvfs://fileset/fileset_catalog/tmp/tmp_fileset/ttt/zzz')
3. Supports move a file which destination's parent directory is existed.
The directory `gvfs://fileset/fileset_catalog/tmp/tmp_fileset/xyz/qqq` is existed.
fs.mv(path1='gvfs://fileset/fileset_catalog/tmp/tmp_fileset/ttt/test2.txt',
path2='gvfs://fileset/fileset_catalog/tmp/tmp_fileset/xyz/qqq/test3.txt')
4. Supports move a directory which destination's parent directory is existed.
The directory `gvfs://fileset/fileset_catalog/tmp/tmp_fileset/xyz/qqq` is existed.
fs.mv(path1='gvfs://fileset/fileset_catalog/tmp/tmp_fileset/ttt/zzz',
path2='gvfs://fileset/fileset_catalog/tmp/tmp_fileset/xyz/qqq/ppp')
"""Move a file to another directory.
This can move a file to another existing directory.
If the target path directory does not exist, an exception will be thrown.
:param path1: Virtual src fileset path
:param path2: Virtual dst fileset path, should be consistent with the src path fileset identifier
:param kwargs: Extra args
"""
path1 = self._pre_process_path(path1)
path2 = self._pre_process_path(path2)
Expand Down Expand Up @@ -190,9 +214,15 @@ def mv(self, path1, path2, **kwargs):
)

def _rm(self, path):
raise RuntimeError("Deprecated method, use rm_file method instead.")
raise RuntimeError("Deprecated method, use `rm_file` method instead.")

def rm(self, path, recursive=False, maxdepth=None):
"""Remove a file or directory.
:param path: Virtual fileset path
:param recursive: Whether to remove the directory recursively.
When removing a directory, this parameter should be True.
:param maxdepth: The maximum depth to remove the directory recursively.
"""
context: FilesetContext = self._get_fileset_context(path)
context.fs.rm(
self._strip_storage_protocol(context.storage_type, context.actual_path),
Expand All @@ -201,19 +231,33 @@ def rm(self, path, recursive=False, maxdepth=None):
)
xloya marked this conversation as resolved.
Show resolved Hide resolved

def rm_file(self, path):
"""Remove a file.
:param path: Virtual fileset path
"""
context: FilesetContext = self._get_fileset_context(path)
context.fs.rm_file(
self._strip_storage_protocol(context.storage_type, context.actual_path)
)

def rmdir(self, path):
"""Remove a directory.
:param path: Virtual fileset path
"""
context: FilesetContext = self._get_fileset_context(path)
context.fs.rmdir(
self._strip_storage_protocol(context.storage_type, context.actual_path)
)
xloya marked this conversation as resolved.
Show resolved Hide resolved

# pylint: disable=W0221
def _open(self, path, mode="rb", block_size=None, seekable=True, **kwargs):
"""Open a file to read/write/append.
:param path: Virtual fileset path
:param mode: The mode now supports: rb(read), wb(write), ab(append). See builtin ``open()``
:param block_size: Some indication of buffering - this is a value in bytes
:param seekable: Some filesystem supports seek, if so, can set this param to True
:param kwargs: Extra args
:return A file-like object from the filesystem
"""
context: FilesetContext = self._get_fileset_context(path)
if context.storage_type == StorageType.HDFS:
return context.fs._open(
Expand All @@ -233,16 +277,23 @@ def _open(self, path, mode="rb", block_size=None, seekable=True, **kwargs):
raise RuntimeError(f"Storage type:{context.storage_type} doesn't support now.")

def mkdir(self, path, create_parents=True, **kwargs):
if create_parents:
self.makedirs(path, exist_ok=True)
else:
context: FilesetContext = self._get_fileset_context(path)
context.fs.mkdir(
self._strip_storage_protocol(context.storage_type, context.actual_path),
create_parents,
)
"""Make a directory.
:param path: Virtual fileset path
:param create_parents: Create parent directories if missing when set to True
:param kwargs: Extra args
"""
context: FilesetContext = self._get_fileset_context(path)
context.fs.mkdir(
self._strip_storage_protocol(context.storage_type, context.actual_path),
create_parents,
**kwargs,
)

def makedirs(self, path, exist_ok=True):
xloya marked this conversation as resolved.
Show resolved Hide resolved
"""Make a directory.
:param path: Virtual fileset path
:param exist_ok: Continue if a directory already exists
"""
context: FilesetContext = self._get_fileset_context(path)
context.fs.makedirs(
self._strip_storage_protocol(context.storage_type, context.actual_path),
Expand All @@ -253,13 +304,26 @@ def created(self, path):
raise RuntimeError("Unsupported method now.")
xloya marked this conversation as resolved.
Show resolved Hide resolved

def modified(self, path):
"""Returns the modified time of the path file if it exists.
:param path: Virtual fileset path
:return Modified time(datetime.datetime)
"""
context: FilesetContext = self._get_fileset_context(path)
return context.fs.modified(
self._strip_storage_protocol(context.storage_type, context.actual_path)
).mtime
)

def cat_file(self, path, start=None, end=None, **kwargs):
kwargs["seekable"] = start not in [None, 0]
"""Get the content of a file.
:param path: Virtual fileset path
:param start:
xloya marked this conversation as resolved.
Show resolved Hide resolved
:param end:
Bytes limits of the read. If negative, backwards from end,
like usual python slices. Either can be None for start or
end of file, respectively
:param kwargs: Extra args
:return File content
"""
context: FilesetContext = self._get_fileset_context(path)
return context.fs.cat_file(
self._strip_storage_protocol(context.storage_type, context.actual_path),
Expand All @@ -269,15 +333,25 @@ def cat_file(self, path, start=None, end=None, **kwargs):
)

def get_file(self, rpath, lpath, **kwargs):
xloya marked this conversation as resolved.
Show resolved Hide resolved
kwargs["seekable"] = False
"""Copy single remote file to local.
:param rpath: Remote file path
:param lpath: Local file path
:param kwargs: Extra args
"""
context: FilesetContext = self._get_fileset_context(rpath)
return context.fs.get_file(
context.fs.get_file(
self._strip_storage_protocol(context.storage_type, context.actual_path),
lpath,
**kwargs,
)

def _actual_path_to_virtual_path(self, path, context: FilesetContext):
def _convert_actual_path(self, path, context: FilesetContext):
"""Convert an actual path to a virtual path.
The virtual path is like `fileset/{catalog}/{schema}/{fileset}/xxx`.
:param path: Actual path
:param context: Fileset context
:return A virtual path
"""
if context.storage_type == StorageType.HDFS:
actual_prefix = infer_storage_options(context.fileset.storage_location())[
"path"
Expand All @@ -299,15 +373,24 @@ def _actual_path_to_virtual_path(self, path, context: FilesetContext):
return f"{path.replace(actual_prefix, virtual_location)}"

def _convert_info(self, entry: Dict, context: FilesetContext):
xloya marked this conversation as resolved.
Show resolved Hide resolved
path = self._actual_path_to_virtual_path(entry["name"], context)
"""Convert a file info from actual entry to virtual entry.
:param entry: A dict of the actual file info
:param context: Fileset context
:return A dict of the virtual file info
"""
path = self._convert_actual_path(entry["name"], context)
return {
"name": path,
"size": entry["size"],
"type": entry["type"],
"mtime": entry["mtime"],
}

def _get_fileset_context(self, virtual_path: str) -> FilesetContext:
def _get_fileset_context(self, virtual_path: str):
"""Get a fileset context from the cache or the Gravitino server
:param virtual_path: The virtual path
:return A fileset context
"""
virtual_path: str = self._pre_process_path(virtual_path)
identifier: NameIdentifier = self._extract_identifier(virtual_path)
read_lock = self.cache_lock.gen_rlock()
Expand Down Expand Up @@ -374,7 +457,11 @@ def _get_fileset_context(self, virtual_path: str) -> FilesetContext:
finally:
write_lock.release()

def _extract_identifier(self, path) -> NameIdentifier:
def _extract_identifier(self, path):
"""Extract the fileset identifier from the path.
:param path: The virtual fileset path
:return The fileset identifier
"""
if path is None:
raise RuntimeError("path which need be extracted cannot be null or empty.")

Expand All @@ -386,6 +473,10 @@ def _extract_identifier(self, path) -> NameIdentifier:
raise RuntimeError(f"path: `{path}` doesn't contains valid identifier.")

def _load_fileset_from_server(self, identifier: NameIdentifier) -> Fileset:
"""Load the fileset from the server.
:param identifier: The fileset identifier
:return The fileset
"""
catalog: Catalog = self.client.load_catalog(
NameIdentifier.of_catalog(
identifier.namespace().level(0), identifier.namespace().level(1)
Expand All @@ -400,7 +491,15 @@ def _get_actual_path_by_ident(
fs: AbstractFileSystem,
storage_type: StorageType,
virtual_path: str,
) -> str:
):
"""Get the actual path by the virtual path and the fileset.
:param identifier: The fileset identifier
:param fileset: The fileset
:param fs: The file system corresponding to the fileset storage location
:param storage_type: The storage type of the fileset storage location
:param virtual_path: The virtual fileset path
:return The actual path.
"""
virtual_location = self._get_virtual_location(identifier)
storage_location = fileset.storage_location()
if self._check_mount_single_file(fileset, fs, storage_type):
Expand All @@ -413,7 +512,11 @@ def _get_actual_path_by_ident(
return virtual_path.replace(virtual_location, storage_location, 1)

@staticmethod
def _get_virtual_location(identifier: NameIdentifier) -> str:
def _get_virtual_location(identifier: NameIdentifier):
"""Get the virtual location of the fileset.
:param identifier: The name identifier of the fileset
:return The virtual location.
"""
return (
f"fileset/{identifier.namespace().level(1)}"
f"/{identifier.namespace().level(2)}"
Expand All @@ -422,14 +525,27 @@ def _get_virtual_location(identifier: NameIdentifier) -> str:

def _check_mount_single_file(
self, fileset: Fileset, fs: AbstractFileSystem, storage_type: StorageType
) -> bool:
):
"""Check if the fileset is mounted a single file.
:param fileset: The fileset
:param fs: The file system corresponding to the fileset storage location
:param storage_type: The storage type of the fileset storage location
:return True the fileset is mounted a single file.
"""
result: Dict = fs.info(
self._strip_storage_protocol(storage_type, fileset.storage_location())
)
return result["type"] == "file"

@staticmethod
def _pre_process_path(path):
"""Pre-process the path.
We will uniformly process `gvfs://fileset/{catalog}/{schema}/{fileset_name}/xxx`
into the format of `fileset/{catalog}/{schema}/{fileset_name}/xxx`.
This is because some implementations of `PyArrow` and `fsspec` can only recognize this format.
:param path: The virtual path
:return The pre-processed path
"""
if isinstance(path, PurePosixPath):
path = path.as_posix()
gvfs_prefix = f"{PROTOCOL_NAME}://"
Expand All @@ -444,6 +560,14 @@ def _pre_process_path(path):

@staticmethod
def _strip_storage_protocol(storage_type: StorageType, path: str):
"""Strip the storage protocol from the path.
Before passing the path to the underlying file system for processing,
pre-process the protocol information in the path.
Some file systems require special processing.
:param storage_type: The storage type
:param path: The path
:return: The stripped path
"""
if storage_type == StorageType.HDFS:
return path
if storage_type == StorageType.FILE:
Expand Down
Loading