Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for HUAWEI Object Storage Service (OBS) #823

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ Other examples of URLs that ``smart_open`` accepts::
s3://my_key:my_secret@my_server:my_port@my_bucket/my_key
gs://my_bucket/my_blob
azure://my_bucket/my_blob
obs://bucket_id.server:port/object_key
hdfs:///path/file
hdfs://path/file
webhdfs://host:port/path/file
Expand Down Expand Up @@ -290,6 +291,7 @@ Transport-specific Options
- WebHDFS
- GCS
- Azure Blob Storage
- OBS (Huawei Object Storage)

Each option involves setting up its own set of parameters.
For example, for accessing S3, you often need to set up authentication, like API keys or a profile name.
Expand Down Expand Up @@ -455,6 +457,55 @@ Additional keyword arguments can be propagated to the ``commit_block_list`` meth
kwargs = {'metadata': {'version': 2}}
fout = open('azure://container/key', 'wb', transport_params={'blob_kwargs': kwargs})

OBS Credentials
---------------
``smart_open`` uses the ``esdk-obs-python`` library to talk to OBS.
Please see `esdk-obs-python docs <https://support.huaweicloud.com/intl/en-us/sdk-python-devg-obs/obs_22_0500.html>`__.

There are several ways to provide Access key, Secret Key and Security Token

- Using env variables
- Using custom client params

AK, SK, ST can be encrypted in this case You need install and configure `security provider <https://support.huawei.com/enterprise/en/software/260510077-ESW2000847337>`__.


OBS Advanced Usage
--------------------
- Supported env variables:

OBS_ACCESS_KEY_ID,
OBS_SECRET_ACCESS_KEY,
OBS_SECURITY_TOKEN,
SMART_OPEN_OBS_USE_CLIENT_WRITE_MODE,
SMART_OPEN_OBS_DECRYPT_AK_SK,
SMART_OPEN_OBS_SCC_LIB_PATH,
SMART_OPEN_OBS_SCC_CONF_PATH

- Configuration via code
.. code-block:: python

client = {'access_key_id': 'ak', 'secret_access_key': 'sk', 'security_token': 'st', 'server': 'server_url'}
headers = obs.PutObjectHeader(contentType='text/plain')
transport_params = {
>>> # client can be dict with parameters supported by the obs.ObsClient or instance of the obs.ObsClient
>>> 'client': client,
>>> # additional header for request, please see esdk-obs-python docs
>>> 'headers': headers,
>>> # if True obs.ObsClient will be take write method argument as readable object to get bytes. For writing mode only.
>>> # Please see docs for ObsClient.putContent api.
>>> 'use_obs_client_write_mode': True,
>>> # True if need decrypt Ak, Sk, St
>>> # It required to install CryptoAPI libs.
>>> # https://support.huawei.com/enterprise/en/software/260510077-ESW2000847337
>>> 'decrypt_ak_sk' : True,
>>> # path to python libs of the Crypto provider
>>> 'scc_lib_path': '/usr/lib/scc',
>>> # path to config file of the Crypto provider
>>> 'scc_conf_path': '/home/user/scc.conf'}

fout = open('obs://bucket_id.server:port/object_key', 'wb', transport_params=transport_params)

Drop-in replacement of ``pathlib.Path.open``
--------------------------------------------

Expand Down
128 changes: 128 additions & 0 deletions integration-tests/test_obs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
# -*- coding: utf-8 -*-
import io
import os

import obs
from obs import ObsClient

import smart_open
from smart_open.obs import parse_uri

_OBS_URL = os.environ.get('SO_OBS_URL')

assert _OBS_URL is not None, 'please set the SO_OBS_URL environment variable'

assert os.environ.get('OBS_ACCESS_KEY_ID') is not None, \
'please set the OBS_ACCESS_KEY_ID environment variable'
assert os.environ.get('OBS_SECRET_ACCESS_KEY') is not None, \
'please set the OBS_SECRET_ACCESS_KEY environment variable'


def _clear_bucket(obs_client: obs.ObsClient, bucket_id: str):
objects = obs_client.listObjects(bucketName=bucket_id)
for content in objects.body.contents:
print(content.get('key'))
_delete_object(obs_client=obs_client,
bucket_id=bucket_id,
object_key=content.get('key'))


def _delete_object(obs_client: obs.ObsClient, bucket_id: str, object_key: str):
try:
resp = obs_client.deleteObject(bucketName=bucket_id, objectKey=object_key)
if resp.status < 300:
print('requestId:', resp.requestId)
print('deleteMarker:', resp.body.deleteMarker)
except Exception as ex:
print(ex)


def initialize_bucket():
parsed = parse_uri(_OBS_URL)
server = f'https://{parsed.get("server")}'
bucket_id = parsed.get('bucket_id')
obs_client = ObsClient(server=server, security_provider_policy='ENV')
_clear_bucket(obs_client=obs_client, bucket_id=bucket_id)


def write_read(key, content, write_mode, read_mode, **kwargs):
with smart_open.open(key, write_mode, **kwargs) as fout:
fout.write(content)
with smart_open.open(key, read_mode, **kwargs) as fin:
return fin.read()


def read_length_prefixed_messages(key, read_mode, **kwargs):
result = io.BytesIO()

with smart_open.open(key, read_mode, **kwargs) as fin:
length_byte = fin.read(1)
while len(length_byte):
result.write(length_byte)
msg = fin.read(ord(length_byte))
result.write(msg)
length_byte = fin.read(1)
return result.getvalue()


def test_obs_readwrite_binary(benchmark):
initialize_bucket()

key = _OBS_URL + '/sanity.txt'
binary = 'с гранатою в кармане, с чекою в руке'.encode()
actual = benchmark(write_read, key, binary, 'wb', 'rb')
assert actual == binary


def test_obs_readwrite_binary_gzip(benchmark):
initialize_bucket()

key = _OBS_URL + '/sanity.txt.gz'
binary = 'не чайки здесь запели на знакомом языке'.encode()
actual = benchmark(write_read, key, binary, 'wb', 'rb')
assert actual == binary


def test_obs_performance(benchmark):
initialize_bucket()

one_megabyte = io.BytesIO()
for _ in range(1024 * 128):
one_megabyte.write(b'01234567')
one_megabyte = one_megabyte.getvalue()

key = _OBS_URL + '/performance.txt'
actual = benchmark(write_read, key, one_megabyte, 'wb', 'rb')
assert actual == one_megabyte


def test_obs_performance_gz(benchmark):
initialize_bucket()

one_megabyte = io.BytesIO()
for _ in range(1024 * 128):
one_megabyte.write(b'01234567')
one_megabyte = one_megabyte.getvalue()

key = _OBS_URL + '/performance.txt.gz'
actual = benchmark(write_read, key, one_megabyte, 'wb', 'rb')
assert actual == one_megabyte


def test_obs_performance_small_reads(benchmark):
initialize_bucket()

ONE_MIB = 1024 ** 2
one_megabyte_of_msgs = io.BytesIO()
msg = b'\x0f' + b'0123456789abcde' # a length-prefixed "message"
for _ in range(0, ONE_MIB, len(msg)):
one_megabyte_of_msgs.write(msg)
one_megabyte_of_msgs = one_megabyte_of_msgs.getvalue()

key = _OBS_URL + '/many_reads_performance.bin'

with smart_open.open(key, 'wb') as fout:
fout.write(one_megabyte_of_msgs)

actual = benchmark(read_length_prefixed_messages, key, 'rb', buffering=ONE_MIB)
assert actual == one_megabyte_of_msgs
4 changes: 3 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@ def read(fname):
http_deps = ['requests']
ssh_deps = ['paramiko']
zst_deps = ['zstandard']
obs_deps = ['esdk-obs-python']

all_deps = aws_deps + gcs_deps + azure_deps + http_deps + ssh_deps + zst_deps
all_deps = aws_deps + gcs_deps + azure_deps + http_deps + ssh_deps + zst_deps + obs_deps
tests_require = all_deps + [
'moto[server]',
'responses',
Expand Down Expand Up @@ -83,6 +84,7 @@ def read(fname):
'webhdfs': http_deps,
'ssh': ssh_deps,
'zst': zst_deps,
'obs': obs_deps,
},
python_requires=">=3.7,<4.0",

Expand Down
Loading