Skip to content

Commit

Permalink
Merge branch 'release-7.0.0'
Browse files Browse the repository at this point in the history
  • Loading branch information
mpenkov committed Feb 26, 2024
2 parents 299ca52 + ebddffd commit 882e52e
Show file tree
Hide file tree
Showing 22 changed files with 829 additions and 198 deletions.
16 changes: 15 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,18 @@
# Unreleased
# 7.0.0, 2024-02-26

* Upgrade dev status classifier to stable (PR [#798](https://github.com/piskvorky/smart_open/pull/798), [@seebi](https://github.com/seebi))
* Add zstandard compression support (PR [#801](https://github.com/piskvorky/smart_open/pull/801), [@rlrs](https://github.com/rlrs))
* Support moto 4 & 5 (PR [#802](https://github.com/piskvorky/smart_open/pull/802), [@jayvdb](https://github.com/jayvdb))
* Add logic for handling large files in MultipartWriter uploads to S3 (PR [#796](https://github.com/piskvorky/smart_open/pull/796), [@jakkdl](https://github.com/jakkdl))
* Add support for SSH connection via aliases from `~/.ssh/config` (PR [#790](https://github.com/piskvorky/smart_open/pull/790), [@wbeardall](https://github.com/wbeardall))
* Secure the connection using SSL when connecting to the FTPS server (PR [#793](https://github.com/piskvorky/smart_open/pull/793), [@wammaster](https://github.com/wammaster))
* Make GCS I/O 1000x faster by avoiding unnecessary API call (PR [#788](https://github.com/piskvorky/smart_open/pull/788), [@JohnHBrock](https://github.com/JohnHBrock))
* Retry finalizing multipart S3 upload (PR [#785](https://github.com/piskvorky/smart_open/pull/785), [@ddelange](https://github.com/ddelange))
* Handle exceptions during writes to Azure (PR [#783](https://github.com/piskvorky/smart_open/pull/783), [@ddelange](https://github.com/ddelange))
* Fix formatting of python code in MIGRATING_FROM_OLDER_VERSIONS.rst (PR [#795](https://github.com/piskvorky/smart_open/pull/795), [@kenahoo](https://github.com/kenahoo))
* Fix __str__ method in SinglepartWriter (PR [#791](https://github.com/piskvorky/smart_open/pull/791), [@ThosRTanner](https://github.com/ThosRTanner))
* Fix `KeyError: 'ContentRange'` when received full content from S3 (PR [#789](https://github.com/piskvorky/smart_open/pull/789), [@messense](https://github.com/messense))
* Propagate __exit__ call to the underlying filestream (PR [#786](https://github.com/piskvorky/smart_open/pull/786), [@ddelange](https://github.com/ddelange))

## 6.4.0, 2023-09-07

Expand Down
18 changes: 9 additions & 9 deletions MIGRATING_FROM_OLDER_VERSIONS.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,17 @@ Migrating to the new compression parameter
smart_open versions 6.0.0 and above no longer support the ``ignore_ext`` parameter.
Use the ``compression`` parameter instead:

```python
fin = smart_open.open("/path/file.gz", ignore_ext=True) # No
fin = smart_open.open("/path/file.gz", compression="disable") # Yes
fin = smart_open.open("/path/file.gz", ignore_ext=False) # No
fin = smart_open.open("/path/file.gz") # Yes
fin = smart_open.open("/path/file.gz", compression="infer_from_extension") # Yes, if you want to be explicit
.. code-block:: python
fin = smart_open.open("/path/file", compression=".gz") # Yes
fin = smart_open.open("/path/file.gz", ignore_ext=True) # No
fin = smart_open.open("/path/file.gz", compression="disable") # Yes
fin = smart_open.open("/path/file.gz", ignore_ext=False) # No
fin = smart_open.open("/path/file.gz") # Yes
fin = smart_open.open("/path/file.gz", compression="infer_from_extension") # Yes, if you want to be explicit
fin = smart_open.open("/path/file", compression=".gz") # Yes
```
Migrating to the new client-based S3 API
========================================
Expand Down
41 changes: 38 additions & 3 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ You can customize the credentials when constructing the session for the client.
aws_session_token=SESSION_TOKEN,
)
client = session.client('s3', endpoint_url=..., config=...)
fin = open('s3://bucket/key', transport_params=dict(client=client))
fin = open('s3://bucket/key', transport_params={'client': client})
Your second option is to specify the credentials within the S3 URL itself:

Expand All @@ -341,6 +341,18 @@ Your second option is to specify the credentials within the S3 URL itself:
*Important*: ``smart_open`` ignores configuration files from the older ``boto`` library.
Port your old ``boto`` settings to ``boto3`` in order to use them with ``smart_open``.

S3 Advanced Usage
-----------------

Additional keyword arguments can be propagated to the boto3 methods that are used by ``smart_open`` under the hood using the ``client_kwargs`` transport parameter.

For instance, to upload a blob with Metadata, ACL, StorageClass, these keyword arguments can be passed to ``create_multipart_upload`` (`docs <https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.create_multipart_upload>`__).

.. code-block:: python
kwargs = {'Metadata': {'version': 2}, 'ACL': 'authenticated-read', 'StorageClass': 'STANDARD_IA'}
fout = open('s3://bucket/key', 'wb', transport_params={'client_kwargs': {'S3.Client.create_multipart_upload': kwargs}})
Iterating Over an S3 Bucket's Contents
--------------------------------------

Expand Down Expand Up @@ -392,7 +404,20 @@ and pass it to the Client. To create an API token for use in the example below,
token = os.environ['GOOGLE_API_TOKEN']
credentials = Credentials(token=token)
client = Client(credentials=credentials)
fin = open('gs://gcp-public-data-landsat/index.csv.gz', transport_params=dict(client=client))
fin = open('gs://gcp-public-data-landsat/index.csv.gz', transport_params={'client': client})
GCS Advanced Usage
------------------

Additional keyword arguments can be propagated to the GCS open method (`docs <https://cloud.google.com/python/docs/reference/storage/latest/google.cloud.storage.blob.Blob#google_cloud_storage_blob_Blob_open>`__), which is used by ``smart_open`` under the hood, using the ``blob_open_kwargs`` transport parameter.

Additional blob properties (`docs <https://cloud.google.com/python/docs/reference/storage/latest/google.cloud.storage.blob.Blob#properties>`__) can be set before an upload, as long as they are not read-only, using the ``blob_properties`` transport parameter.

.. code-block:: python
open_kwargs = {'predefined_acl': 'authenticated-read'}
properties = {'metadata': {'version': 2}, 'storage_class': 'COLDLINE'}
fout = open('gs://bucket/key', 'wb', transport_params={'blob_open_kwargs': open_kwargs, 'blob_properties': properties})
Azure Credentials
-----------------
Expand All @@ -413,11 +438,21 @@ to setting up authentication.
from azure.storage.blob import BlobServiceClient
azure_storage_connection_string = os.environ['AZURE_STORAGE_CONNECTION_STRING']
client = BlobServiceClient.from_connection_string(azure_storage_connection_string)
fin = open('azure://my_container/my_blob.txt', transport_params=dict(client=client))
fin = open('azure://my_container/my_blob.txt', transport_params={'client': client})
If you need more credential options, refer to the
`Azure Storage authentication guide <https://docs.microsoft.com/en-us/azure/storage/common/storage-samples-python#authentication>`__.

Azure Advanced Usage
--------------------

Additional keyword arguments can be propagated to the ``commit_block_list`` method (`docs <https://azuresdkdocs.blob.core.windows.net/$web/python/azure-storage-blob/12.14.1/azure.storage.blob.html#azure.storage.blob.BlobClient.commit_block_list>`__), which is used by ``smart_open`` under the hood for uploads, using the ``blob_kwargs`` transport parameter.

.. code-block:: python
kwargs = {'metadata': {'version': 2}}
fout = open('azure://container/key', 'wb', transport_params={'blob_kwargs': kwargs})
Drop-in replacement of ``pathlib.Path.open``
--------------------------------------------

Expand Down
10 changes: 10 additions & 0 deletions integration-tests/test_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ def write_read(key, content, write_mode, read_mode, **kwargs):
with smart_open.open(key, read_mode, **kwargs) as fin:
return fin.read()

def open_only(key, read_mode, **kwargs) -> None:
with smart_open.open(key, read_mode, **kwargs):
pass

def read_length_prefixed_messages(key, read_mode, **kwargs):
result = io.BytesIO()
Expand Down Expand Up @@ -121,3 +124,10 @@ def test_gcs_performance_small_reads(benchmark):

actual = benchmark(read_length_prefixed_messages, key, 'rb', buffering=ONE_MIB)
assert actual == one_megabyte_of_msgs

def test_gcs_performance_open(benchmark):
# we don't need to use a uri that actually exists in order to call GCS's open()
key = "gs://some-bucket/some_blob.txt"
transport_params = {'client': google.cloud.storage.Client()}
benchmark(open_only, key, 'rb', transport_params=transport_params)
assert True
58 changes: 46 additions & 12 deletions integration-tests/test_s3_ported.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import contextlib
import gzip
import io
import os
import unittest
import uuid
import warnings
Expand Down Expand Up @@ -203,20 +204,53 @@ def test_write(self):

def test_multipart(self):
"""Does s3 multipart chunking work correctly?"""
with smart_open.s3.MultipartWriter(BUCKET_NAME, self.key, min_part_size=10) as fout:
fout.write(b"test")
self.assertEqual(fout._buf.tell(), 4)
data_dir = os.path.join(os.path.dirname(__file__), "../smart_open/tests/test_data")
with open(os.path.join(data_dir, "crime-and-punishment.txt"), "rb") as fin:
crime = fin.read()
data = b''
ps = 5 * 1024 * 1024
while len(data) < ps:
data += crime

title = "Преступление и наказание\n\n".encode()
to_be_continued = "\n\n... продолжение следует ...\n\n".encode()

key = "WriterTest.test_multipart"
with smart_open.s3.MultipartWriter(BUCKET_NAME, key, part_size=ps) as fout:
#
# Write some data without triggering an upload
#
fout.write(title)
assert fout._total_parts == 0
assert fout._buf.tell() == 48

#
# Trigger a part upload
#
fout.write(data)
assert fout._total_parts == 1
assert fout._buf.tell() == 661

#
# Write _without_ triggering a part upload
#
fout.write(to_be_continued)
assert fout._total_parts == 1
assert fout._buf.tell() == 710

fout.write(b"test\n")
self.assertEqual(fout._buf.tell(), 9)
self.assertEqual(fout._total_parts, 0)

fout.write(b"test")
self.assertEqual(fout._buf.tell(), 0)
self.assertEqual(fout._total_parts, 1)
#
# We closed the writer, so the final part must have been uploaded
#
assert fout._buf.tell() == 0
assert fout._total_parts == 2

data = read_key(self.key)
self.assertEqual(data, b"testtest\ntest")
#
# read back the same key and check its content
#
with smart_open.s3.open(BUCKET_NAME, key, 'rb') as fin:
got = fin.read()
want = title + data + to_be_continued
assert want == got

def test_empty_key(self):
"""Does writing no data cause key with an empty value to be created?"""
Expand Down
12 changes: 8 additions & 4 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,22 @@ def _get_version():
def read(fname):
return io.open(os.path.join(os.path.dirname(__file__), fname), encoding='utf-8').read()


base_deps = ['wrapt']
aws_deps = ['boto3']
gcs_deps = ['google-cloud-storage>=2.6.0']
azure_deps = ['azure-storage-blob', 'azure-common', 'azure-core']
http_deps = ['requests']
ssh_deps = ['paramiko']
zst_deps = ['zstandard']

all_deps = aws_deps + gcs_deps + azure_deps + http_deps + ssh_deps
all_deps = aws_deps + gcs_deps + azure_deps + http_deps + ssh_deps + zst_deps
tests_require = all_deps + [
'moto[server]',
'responses',
'boto3',
'pytest',
'pytest-rerunfailures'
'pytest-rerunfailures',
'pytest-benchmark',
]

setup(
Expand All @@ -70,6 +72,7 @@ def read(fname):
license='MIT',
platforms='any',

install_requires=base_deps,
tests_require=tests_require,
extras_require={
'test': tests_require,
Expand All @@ -80,13 +83,14 @@ def read(fname):
'http': http_deps,
'webhdfs': http_deps,
'ssh': ssh_deps,
'zst': zst_deps,
},
python_requires=">=3.6,<4.0",

test_suite="smart_open.tests",

classifiers=[
'Development Status :: 4 - Beta',
'Development Status :: 5 - Production/Stable',
'Environment :: Console',
'Intended Audience :: Developers',
'License :: OSI Approved :: MIT License',
Expand Down
21 changes: 18 additions & 3 deletions smart_open/azure.py
Original file line number Diff line number Diff line change
Expand Up @@ -417,18 +417,30 @@ def __init__(
def flush(self):
pass

def terminate(self):
"""Do not commit block list on abort.
Uploaded (uncommitted) blocks will be garbage collected after 7 days.
See also https://stackoverflow.com/a/69673084/5511061."""
logger.debug('%s: terminating multipart upload', self)
if not self.closed:
self._block_list = []
self._is_closed = True
logger.debug('%s: terminated multipart upload', self)

#
# Override some methods from io.IOBase.
#
def close(self):
logger.debug("closing")
if not self.closed:
logger.debug('%s: completing multipart upload', self)
if self._current_part.tell() > 0:
self._upload_part()
self._blob.commit_block_list(self._block_list, **self._blob_kwargs)
self._block_list = []
self._is_closed = True
logger.debug("successfully closed")
logger.debug('%s: completed multipart upload', self)

@property
def closed(self):
Expand Down Expand Up @@ -509,7 +521,10 @@ def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
if exc_type is not None:
self.terminate()
else:
self.close()

def __str__(self):
return "(%s, %r, %r)" % (
Expand Down
12 changes: 10 additions & 2 deletions smart_open/compression.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ def register_compressor(ext, callback):
def tweak_close(outer, inner):
"""Ensure that closing the `outer` stream closes the `inner` stream as well.
Deprecated: smart_open.open().__exit__ now always calls __exit__ on the
underlying filestream.
Use this when your compression library's `close` method does not
automatically close the underlying filestream. See
https://github.com/RaRe-Technologies/smart_open/issues/630 for an
Expand All @@ -93,14 +96,18 @@ def close_both(*args):
def _handle_bz2(file_obj, mode):
from bz2 import BZ2File
result = BZ2File(file_obj, mode)
tweak_close(result, file_obj)
return result


def _handle_gzip(file_obj, mode):
import gzip
result = gzip.GzipFile(fileobj=file_obj, mode=mode)
tweak_close(result, file_obj)
return result


def _handle_zstd(file_obj, mode):
import zstandard as zstd
result = zstd.ZstdDecompressor().stream_reader(file_obj, closefd=True)
return result


Expand Down Expand Up @@ -145,3 +152,4 @@ def compression_wrapper(file_obj, mode, compression=INFER_FROM_EXTENSION, filena
#
register_compressor('.bz2', _handle_bz2)
register_compressor('.gz', _handle_gzip)
register_compressor('.zst', _handle_zstd)
4 changes: 3 additions & 1 deletion smart_open/ftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
"""

import logging
import ssl
import urllib.parse
import smart_open.utils
from ftplib import FTP, FTP_TLS, error_reply
Expand Down Expand Up @@ -85,7 +86,8 @@ def convert_transport_params_to_args(transport_params):
def _connect(hostname, username, port, password, secure_connection, transport_params):
kwargs = convert_transport_params_to_args(transport_params)
if secure_connection:
ftp = FTP_TLS(**kwargs)
ssl_context = ssl.create_default_context(purpose=ssl.Purpose.CLIENT_AUTH)
ftp = FTP_TLS(context=ssl_context, **kwargs)
else:
ftp = FTP(**kwargs)
try:
Expand Down
5 changes: 1 addition & 4 deletions smart_open/gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,7 @@ def Reader(bucket,
warn_deprecated('line_terminator')

bkt = client.bucket(bucket)
blob = bkt.get_blob(key)

if blob is None:
raise google.cloud.exceptions.NotFound(f'blob {key} not found in {bucket}')
blob = bkt.blob(key)

return blob.open('rb', **blob_open_kwargs)

Expand Down
Loading

0 comments on commit 882e52e

Please sign in to comment.