Skip to content

Using the Python buffer protocol in pipe #959

Open
@tomwhite

Description

@tomwhite

Is it possible to avoid memory copies when writing to S3 by taking advantage of the Python buffer protocol?

In particular, it would be great if we could use a memoryview object in calls to pipe().

Using bytes and bytearray work fine:

>>> import numpy as np
>>> import fsspec
>>> fs = fsspec.filesystem('s3')
>>> fs.pipe("s3://cubed-unittest/mem-array/s3test", b"hello") # works
[{'ResponseMetadata': {'RequestId': 'E54C5V2H9M1ZE1V1', 'HostId': 'pcEW+k8KZHBbYuohqfgGNZOac8Y7QzRAHaAxOJmMWQdEll75umxh+On2DXxWi8qKMcG2+aZmJ7I=', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amz-id-2': 'pcEW+k8KZHBbYuohqfgGNZOac8Y7QzRAHaAxOJmMWQdEll75umxh+On2DXxWi8qKMcG2+aZmJ7I=', 'x-amz-request-id': 'E54C5V2H9M1ZE1V1', 'date': 'Thu, 10 Apr 2025 13:19:42 GMT', 'x-amz-expiration': 'expiry-date="Sat, 12 Apr 2025 00:00:00 GMT", rule-id="gc"', 'x-amz-server-side-encryption': 'AES256', 'etag': '"5d41402abc4b2a76b9719d911017c592"', 'x-amz-checksum-crc32': 'NhCmhg==', 'x-amz-checksum-type': 'FULL_OBJECT', 'content-length': '0', 'server': 'AmazonS3'}, 'RetryAttempts': 0}, 'Expiration': 'expiry-date="Sat, 12 Apr 2025 00:00:00 GMT", rule-id="gc"', 'ETag': '"5d41402abc4b2a76b9719d911017c592"', 'ChecksumCRC32': 'NhCmhg==', 'ChecksumType': 'FULL_OBJECT', 'ServerSideEncryption': 'AES256'}]
>>> fs.pipe("s3://cubed-unittest/mem-array/s3test", bytearray(b"hello")) # works
[{'ResponseMetadata': {'RequestId': 'C58QYEAVJTBW96Y6', 'HostId': 'lwi6otZoaiTsPhLYApOdMmhFXphWEfhoYh/sRO4txe88uUnj8wLNHTgaleq7agiipYlyd8GHlzrxrZ8sAF1qQIuyXQUwm5Nq', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amz-id-2': 'lwi6otZoaiTsPhLYApOdMmhFXphWEfhoYh/sRO4txe88uUnj8wLNHTgaleq7agiipYlyd8GHlzrxrZ8sAF1qQIuyXQUwm5Nq', 'x-amz-request-id': 'C58QYEAVJTBW96Y6', 'date': 'Thu, 10 Apr 2025 13:20:04 GMT', 'x-amz-expiration': 'expiry-date="Sat, 12 Apr 2025 00:00:00 GMT", rule-id="gc"', 'x-amz-server-side-encryption': 'AES256', 'etag': '"5d41402abc4b2a76b9719d911017c592"', 'x-amz-checksum-crc32': 'NhCmhg==', 'x-amz-checksum-type': 'FULL_OBJECT', 'content-length': '0', 'server': 'AmazonS3'}, 'RetryAttempts': 0}, 'Expiration': 'expiry-date="Sat, 12 Apr 2025 00:00:00 GMT", rule-id="gc"', 'ETag': '"5d41402abc4b2a76b9719d911017c592"', 'ChecksumCRC32': 'NhCmhg==', 'ChecksumType': 'FULL_OBJECT', 'ServerSideEncryption': 'AES256'}]

But using memoryview or just a numpy array (which supports the buffer protocol too) fail:

>>> fs.pipe("s3://cubed-unittest/mem-array/s3test", memoryview(np.frombuffer(bytearray(b"hello"), dtype="uint8")))
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/Users/tom/miniforge3/envs/memray-array/lib/python3.11/site-packages/fsspec/asyn.py", line 118, in wrapper
    return sync(self.loop, func, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/tom/miniforge3/envs/memray-array/lib/python3.11/site-packages/fsspec/asyn.py", line 103, in sync
    raise return_result
  File "/Users/tom/miniforge3/envs/memray-array/lib/python3.11/site-packages/fsspec/asyn.py", line 56, in _runner
    result[0] = await coro
                ^^^^^^^^^^
  File "/Users/tom/miniforge3/envs/memray-array/lib/python3.11/site-packages/fsspec/asyn.py", line 418, in _pipe
    return await _run_coros_in_chunks(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/tom/miniforge3/envs/memray-array/lib/python3.11/site-packages/fsspec/asyn.py", line 268, in _run_coros_in_chunks
    result, k = await done.pop()
                ^^^^^^^^^^^^^^^^
  File "/Users/tom/miniforge3/envs/memray-array/lib/python3.11/site-packages/fsspec/asyn.py", line 245, in _run_coro
    return await asyncio.wait_for(coro, timeout=timeout), i
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/tom/miniforge3/envs/memray-array/lib/python3.11/asyncio/tasks.py", line 452, in wait_for
    return await fut
           ^^^^^^^^^
  File "/Users/tom/miniforge3/envs/memray-array/lib/python3.11/site-packages/s3fs/core.py", line 1185, in _pipe_file
    out = await self._call_s3(
          ^^^^^^^^^^^^^^^^^^^^
  File "/Users/tom/miniforge3/envs/memray-array/lib/python3.11/site-packages/s3fs/core.py", line 371, in _call_s3
    return await _error_wrapper(
           ^^^^^^^^^^^^^^^^^^^^^
  File "/Users/tom/miniforge3/envs/memray-array/lib/python3.11/site-packages/s3fs/core.py", line 146, in _error_wrapper
    raise err
  File "/Users/tom/miniforge3/envs/memray-array/lib/python3.11/site-packages/s3fs/core.py", line 114, in _error_wrapper
    return await func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/tom/miniforge3/envs/memray-array/lib/python3.11/site-packages/aiobotocore/client.py", line 369, in _make_api_call
    request_dict = await self._convert_to_request_dict(
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/tom/miniforge3/envs/memray-array/lib/python3.11/site-packages/aiobotocore/client.py", line 440, in _convert_to_request_dict
    request_dict = self._serializer.serialize_to_request(
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/tom/miniforge3/envs/memray-array/lib/python3.11/site-packages/botocore/validate.py", line 381, in serialize_to_request
    raise ParamValidationError(report=report.generate_report())
botocore.exceptions.ParamValidationError: Parameter validation failed:
Invalid type for parameter Body, value: <memory at 0x102f99a80>, type: <class 'memoryview'>, valid types: <class 'bytes'>, <class 'bytearray'>, file-like object

This may be a limitation in the underlying boto library, but perhaps there is a way to avoid copying the source data (in this case a numpy array) that I'm missing.

cc @jakirkham who commented about this in zarr-developers/zarr-python#2972 (comment)

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions