Skip to content

Commit

Permalink
[#5221] feat(python-client): Support OSS for fileset python client (#…
Browse files Browse the repository at this point in the history
…5239)

### What changes were proposed in this pull request?

Add support for Aliyun OSS python client.

### Why are the changes needed?

It's a need

Fix: #5221

### Does this PR introduce _any_ user-facing change?

N/A

### How was this patch tested?

Test locally. Do the following change to `test_gvfs_with_oss.py` and run
`./gradlew :clients:client-python:test -PskipDockerTests=false`


<img width="1261" alt="image"
src="https://github.com/user-attachments/assets/a57a3f2e-178b-4b50-8219-5a95d0329cd1">
<img width="1211" alt="image"
src="https://github.com/user-attachments/assets/b2675a44-d9db-4d11-a505-8e098ef3173f">

Co-authored-by: Qi Yu <yuqi@datastrato.com>
  • Loading branch information
github-actions[bot] and yuqi1129 authored Oct 25, 2024
1 parent a62e118 commit bf43a07
Show file tree
Hide file tree
Showing 4 changed files with 437 additions and 1 deletion.
78 changes: 78 additions & 0 deletions clients/client-python/gravitino/filesystem/gvfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class StorageType(Enum):
LOCAL = "file"
GCS = "gs"
S3A = "s3a"
OSS = "oss"


class FilesetContextPair:
Expand Down Expand Up @@ -318,6 +319,7 @@ def mv(self, path1, path2, recursive=False, maxdepth=None, **kwargs):
StorageType.HDFS,
StorageType.GCS,
StorageType.S3A,
StorageType.OSS,
]:
src_context_pair.filesystem().mv(
self._strip_storage_protocol(storage_type, src_actual_path),
Expand Down Expand Up @@ -567,6 +569,14 @@ def _convert_actual_path(
or storage_location.startswith(f"{StorageType.S3A.value}://")
):
actual_prefix = infer_storage_options(storage_location)["path"]
elif storage_location.startswith(f"{StorageType.OSS.value}:/"):
ops = infer_storage_options(storage_location)
if "host" not in ops or "path" not in ops:
raise GravitinoRuntimeException(
f"Storage location:{storage_location} doesn't support now."
)

actual_prefix = ops["host"] + ops["path"]
elif storage_location.startswith(f"{StorageType.LOCAL.value}:/"):
actual_prefix = storage_location[len(f"{StorageType.LOCAL.value}:") :]
else:
Expand Down Expand Up @@ -733,6 +743,8 @@ def _recognize_storage_type(path: str):
return StorageType.GCS
if path.startswith(f"{StorageType.S3A.value}://"):
return StorageType.S3A
if path.startswith(f"{StorageType.OSS.value}://"):
return StorageType.OSS
raise GravitinoRuntimeException(
f"Storage type doesn't support now. Path:{path}"
)
Expand All @@ -756,12 +768,46 @@ def _strip_storage_protocol(storage_type: StorageType, path: str):
:param storage_type: The storage type
:param path: The path
:return: The stripped path
We will handle OSS differently from S3 and GCS, because OSS has different behavior than S3 and GCS.
Please see the following example:
```
>> oss = context_pair.filesystem()
>> oss.ls('oss://bucket-xiaoyu/test_gvfs_catalog678/test_gvfs_schema/test_gvfs_fileset/test_ls')
DEBUG:ossfs:Get directory listing page for bucket-xiaoyu/test_gvfs_catalog678/
test_gvfs_schema/test_gvfs_fileset
DEBUG:ossfs:CALL: ObjectIterator - () - {'prefix': 'test_gvfs_catalog678/test_gvfs_schema
/test_gvfs_fileset/', 'delimiter': '/'}
[]
>> oss.ls('bucket-xiaoyu/test_gvfs_catalog678/test_gvfs_schema/test_gvfs_fileset/test_ls')
DEBUG:ossfs:Get directory listing page for bucket-xiaoyu/test_gvfs_catalog678/test_gvfs_schema
/test_gvfs_fileset/test_ls
DEBUG:ossfs:CALL: ObjectIterator - () - {'prefix': 'test_gvfs_catalog678/test_gvfs_schema
/test_gvfs_fileset/test_ls/', 'delimiter': '/'}
[{'name': 'bucket-xiaoyu/test_gvfs_catalog678/test_gvfs_schema/test_gvfs_fileset/test_ls
/test.file', 'type': 'file', 'size': 0, 'LastModified': 1729754793,
'Size': 0, 'Key': 'bucket-xiaoyu/test_gvfs_catalog678/test_gvfs_schema/
test_gvfs_fileset/test_ls/test.file'}]
```
Please take a look at the above example: if we do not remove the protocol (starts with oss://),
it will always return an empty array when we call `oss.ls`, however, if we remove the protocol,
it will produce the correct result as expected.
"""
if storage_type in (StorageType.HDFS, StorageType.GCS, StorageType.S3A):
return path
if storage_type == StorageType.LOCAL:
return path[len(f"{StorageType.LOCAL.value}:") :]

# OSS has different behavior than S3 and GCS, if we do not remove the
# protocol, it will always return an empty array.
if storage_type == StorageType.OSS:
if path.startswith(f"{StorageType.OSS.value}://"):
return path[len(f"{StorageType.OSS.value}://") :]
return path

raise GravitinoRuntimeException(
f"Storage type:{storage_type} doesn't support now."
)
Expand Down Expand Up @@ -835,6 +881,8 @@ def _get_filesystem(self, actual_file_location: str):
fs = self._get_gcs_filesystem()
elif storage_type == StorageType.S3A:
fs = self._get_s3_filesystem()
elif storage_type == StorageType.OSS:
fs = self._get_oss_filesystem()
else:
raise GravitinoRuntimeException(
f"Storage type: `{storage_type}` doesn't support now."
Expand Down Expand Up @@ -887,5 +935,35 @@ def _get_s3_filesystem(self):
endpoint_url=aws_endpoint_url,
)

def _get_oss_filesystem(self):
# get 'oss_access_key_id' from oss options, if the key is not found, throw an exception
oss_access_key_id = self._options.get(GVFSConfig.GVFS_FILESYSTEM_OSS_ACCESS_KEY)
if oss_access_key_id is None:
raise GravitinoRuntimeException(
"OSS access key id is not found in the options."
)

# get 'oss_secret_access_key' from oss options, if the key is not found, throw an exception
oss_secret_access_key = self._options.get(
GVFSConfig.GVFS_FILESYSTEM_OSS_SECRET_KEY
)
if oss_secret_access_key is None:
raise GravitinoRuntimeException(
"OSS secret access key is not found in the options."
)

# get 'oss_endpoint_url' from oss options, if the key is not found, throw an exception
oss_endpoint_url = self._options.get(GVFSConfig.GVFS_FILESYSTEM_OSS_ENDPOINT)
if oss_endpoint_url is None:
raise GravitinoRuntimeException(
"OSS endpoint url is not found in the options."
)

return importlib.import_module("ossfs").OSSFileSystem(
key=oss_access_key_id,
secret=oss_secret_access_key,
endpoint=oss_endpoint_url,
)


fsspec.register_implementation(PROTOCOL_NAME, GravitinoVirtualFileSystem)
4 changes: 4 additions & 0 deletions clients/client-python/gravitino/filesystem/gvfs_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,7 @@ class GVFSConfig:
GVFS_FILESYSTEM_S3_ACCESS_KEY = "s3_access_key"
GVFS_FILESYSTEM_S3_SECRET_KEY = "s3_secret_key"
GVFS_FILESYSTEM_S3_ENDPOINT = "s3_endpoint"

GVFS_FILESYSTEM_OSS_ACCESS_KEY = "oss_access_key"
GVFS_FILESYSTEM_OSS_SECRET_KEY = "oss_secret_key"
GVFS_FILESYSTEM_OSS_ENDPOINT = "oss_endpoint"
3 changes: 2 additions & 1 deletion clients/client-python/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,5 @@ fsspec==2024.3.1
pyarrow==15.0.2
cachetools==5.3.3
gcsfs==2024.3.1
s3fs==2024.3.1
s3fs==2024.3.1
ossfs==2023.12.0
Loading

0 comments on commit bf43a07

Please sign in to comment.