Skip to content

feat: implement range-based S3 Reader for byte range requests #339

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 29 commits into
base: main
Choose a base branch
from

Conversation

jet-tong
Copy link
Contributor

@jet-tong jet-tong commented Jun 6, 2025

Description

This PR implements range-based S3 reading capabilities for byte range requests alongside the existing sequential reader, allowing for more efficient random access patterns when reading from S3. The changes use the Strategy pattern to separate different reading implementations while maintaining backward compatibility.

Key changes:

  • Range-based S3 Reader
    • Refactor existing reader into _SequentialS3Reader class, which remains the default behaviour
    • Implement new _RangedS3Reader class to allow for byte range requests on-demand
    • Update Rust components to support range parameters in get_object calls with mountpoint client
  • S3ReaderConfig Class
    • Introduce S3ReaderConfig class to configure S3 reader type via reader_config parameter
    • Exposes reader_config in S3MapDataset and S3IterableDataset
  • Extend test coverage to verify both reader implementations
    • Parametrized unit and e2e tests to cover both sequential and range-based readers
    • Added dataset creation tests, instance type checks, and default behaviour tests.
  • Add new user agent information for dataset type and reader type

The default behaviour remains sequential reading for backward compatibility.
Users can opt into range-based reading by configuring S3ReaderConfig and apply it in reader_config parameter, e.g.:

from s3torchconnector import S3MapDataset, S3ReaderConfig

reader_config = S3ReaderConfig.range_based()
dataset = S3MapDataset.from_prefix(
    DATASET_URI, 
    region=REGION,
    reader_config=reader_config
)

# Random access example
item = dataset[0]
item.seek(5 * 1024 * 1024)   # Seek to specific offset
content = item.read(1024)    # Read desired bytes

Additional context

The current/default sequential reader SEQUENTIAL buffers data up to the point of seek/read/readinto methods, and is optimized for full reads and repeated access. However, there are many cases where users only require a specified portion in a large S3 object - and do not require the additional memory buffer and dataloading overheads.

The RANGE_BASED reader enables efficient data access through byte-range requests. It creates a stream for a specified range and returns bytes directly to users without buffering. This approach optimizes memory usage and data transfer for large objects that require random access patterns or partial reads.

The reader executes precise range requests that minimize bandwidth and memory consumption. While it might generate more API calls than sequential readers, it delivers superior performance for selective data access. It maintains standard stream interface compatibility, enabling simple integration with existing systems that need range-based access capabilities.

No Breaking Changes:

  • Private attributes in S3Reader were temporarily moved to access via _reader during initial refactor, but the breaking change was reverted in this commit after another refactor to use __new__.

  • I have updated the CHANGELOG or README if appropriate

Related items

Testing

  • Updated test coverage for both reader implementations
  • Performance testing for range-based reader
  • Verification of backward compatibility with existing usage patterns

By submitting this pull request, I confirm that my contribution is made under the terms of BSD 3-Clause License and I agree to the terms of the LICENSE.

@jet-tong jet-tong requested a review from a team as a code owner June 6, 2025 15:00
@jet-tong jet-tong requested a deployment to integration-tests June 6, 2025 15:00 — with GitHub Actions Waiting
@jet-tong jet-tong requested a deployment to integration-tests June 6, 2025 15:00 — with GitHub Actions Waiting
@jet-tong jet-tong requested a deployment to integration-tests June 6, 2025 15:00 — with GitHub Actions Waiting
@jet-tong jet-tong requested a deployment to integration-tests June 6, 2025 15:00 — with GitHub Actions Waiting
@jet-tong jet-tong requested a deployment to integration-tests June 6, 2025 15:00 — with GitHub Actions Waiting
@jet-tong jet-tong requested a deployment to integration-tests June 6, 2025 15:00 — with GitHub Actions Waiting
@jet-tong jet-tong requested a deployment to integration-tests June 6, 2025 15:00 — with GitHub Actions Waiting
@jet-tong jet-tong requested a deployment to integration-tests June 6, 2025 15:00 — with GitHub Actions Waiting
@jet-tong jet-tong requested a deployment to integration-tests June 6, 2025 15:00 — with GitHub Actions Waiting
@jet-tong jet-tong requested a deployment to integration-tests June 6, 2025 15:00 — with GitHub Actions Waiting
@jet-tong jet-tong requested a deployment to integration-tests June 6, 2025 15:00 — with GitHub Actions Waiting
@jet-tong jet-tong requested a deployment to integration-tests June 6, 2025 15:00 — with GitHub Actions Waiting
@jet-tong jet-tong requested a deployment to integration-tests June 6, 2025 15:00 — with GitHub Actions Waiting
@jet-tong jet-tong requested a deployment to integration-tests June 6, 2025 15:00 — with GitHub Actions Waiting
@jet-tong jet-tong requested a deployment to integration-tests June 6, 2025 15:00 — with GitHub Actions Waiting
@jet-tong jet-tong requested a deployment to integration-tests June 6, 2025 15:00 — with GitHub Actions Waiting
@jet-tong jet-tong requested a deployment to integration-tests June 6, 2025 15:00 — with GitHub Actions Waiting
@jet-tong jet-tong requested a deployment to integration-tests June 6, 2025 15:00 — with GitHub Actions Waiting
@jet-tong jet-tong requested a deployment to integration-tests June 6, 2025 15:00 — with GitHub Actions Waiting
@jet-tong jet-tong requested a deployment to integration-tests June 6, 2025 15:00 — with GitHub Actions Waiting
@jet-tong jet-tong requested a deployment to integration-tests June 6, 2025 16:43 — with GitHub Actions Waiting
@jet-tong jet-tong requested a deployment to integration-tests June 6, 2025 16:43 — with GitHub Actions Waiting
@jet-tong jet-tong requested a deployment to integration-tests June 6, 2025 16:43 — with GitHub Actions Waiting
@jet-tong jet-tong requested a deployment to integration-tests June 6, 2025 16:43 — with GitHub Actions Waiting
@jet-tong jet-tong requested a deployment to integration-tests June 6, 2025 16:43 — with GitHub Actions Waiting
@jet-tong jet-tong requested a deployment to integration-tests June 6, 2025 16:43 — with GitHub Actions Waiting
@jet-tong jet-tong requested a deployment to integration-tests June 6, 2025 16:43 — with GitHub Actions Waiting
@jet-tong jet-tong added the python Pull requests that update Python code label Jun 18, 2025
@jet-tong jet-tong temporarily deployed to integration-tests June 18, 2025 19:19 — with GitHub Actions Inactive
@jet-tong jet-tong temporarily deployed to integration-tests June 18, 2025 19:19 — with GitHub Actions Inactive
jet-tong added 2 commits June 19, 2025 09:39
- Ensure reader_config is an instance of S3ReaderConfig
- Add unit test for invalid config type
- Include usage examples in S3ReaderConfig
@jet-tong jet-tong temporarily deployed to integration-tests June 19, 2025 08:42 — with GitHub Actions Inactive
@jet-tong jet-tong temporarily deployed to integration-tests June 19, 2025 08:42 — with GitHub Actions Inactive
@jet-tong jet-tong temporarily deployed to integration-tests June 19, 2025 08:42 — with GitHub Actions Inactive
@jet-tong jet-tong temporarily deployed to integration-tests June 19, 2025 08:42 — with GitHub Actions Inactive
@jet-tong jet-tong temporarily deployed to integration-tests June 19, 2025 08:42 — with GitHub Actions Inactive
@jet-tong jet-tong temporarily deployed to integration-tests June 19, 2025 08:42 — with GitHub Actions Inactive
@jet-tong jet-tong temporarily deployed to integration-tests June 19, 2025 08:42 — with GitHub Actions Inactive
@jet-tong jet-tong temporarily deployed to integration-tests June 19, 2025 08:42 — with GitHub Actions Inactive
@jet-tong jet-tong temporarily deployed to integration-tests June 19, 2025 08:42 — with GitHub Actions Inactive
@jet-tong jet-tong temporarily deployed to integration-tests June 19, 2025 08:42 — with GitHub Actions Inactive
@jet-tong jet-tong temporarily deployed to integration-tests June 19, 2025 08:42 — with GitHub Actions Inactive
@jet-tong jet-tong temporarily deployed to integration-tests June 19, 2025 08:42 — with GitHub Actions Inactive
@jet-tong jet-tong temporarily deployed to integration-tests June 19, 2025 08:42 — with GitHub Actions Inactive
@jet-tong jet-tong temporarily deployed to integration-tests June 19, 2025 08:42 — with GitHub Actions Inactive
@jet-tong jet-tong temporarily deployed to integration-tests June 19, 2025 08:42 — with GitHub Actions Inactive
@jet-tong jet-tong temporarily deployed to integration-tests June 19, 2025 08:42 — with GitHub Actions Inactive
@jet-tong jet-tong temporarily deployed to integration-tests June 19, 2025 08:42 — with GitHub Actions Inactive
@jet-tong jet-tong temporarily deployed to integration-tests June 19, 2025 08:42 — with GitHub Actions Inactive
@jet-tong jet-tong temporarily deployed to integration-tests June 19, 2025 08:42 — with GitHub Actions Inactive
@jet-tong jet-tong temporarily deployed to integration-tests June 19, 2025 08:42 — with GitHub Actions Inactive
@@ -114,6 +114,35 @@ For example, assuming the following directory bucket name `my-test-bucket--usw2-
usw2-az1, then the URI used will look like: `s3://my-test-bucket--usw2-az1--x-s3/<PREFIX>` (**please note that the
prefix for Amazon S3 Express One Zone should end with '/'**), paired with region us-west-2.

## Reader Configurations
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've included both the original and alternative constructor for S3ReaderConfig, do we want to highlight just one of them for users?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd probably want to highlight whichever one we think is most user friendly

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this section we want to focus on the difference in using S3Client.get_object when we need to download entire object vs download only part of object. Sample with S3MapDataset could be kept at the end of that section.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets also mention, that readers shouldn't be shared between different threads, as their are not a thread safe.

self._client = S3Client(
self.region,
endpoint=self.endpoint,
user_agent=UserAgent(
comments=[
f"md/dataset#iterable md/reader_type#{reader_type_string}"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Follows User Agent 2.0 formatting, but includes brackets. It is slightly different from dcp/lightning user agents, but hopefully 1 step towards AWS best practices.

e.g. Iterable dataset, range-based reader:
s3torchconnector/1.4.1 ua/2.0 lang/python#3.12.9 (md/dataset#iterable md/reader_type#range_based)
e.g. dcp:
f"s3torchconnector/1.4.1 ua/2.0 lang/python#3.12.9 (dcp; {torch.__version__})"

bucket,
key,
get_object_info,
cast(Callable[[], GetObjectStream], get_stream),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Used cast to fix mypy error
Sequential reader requires Callable[[], GetObjectStream] but range-based reader requires Callable[[Optional[int], Optional[int]] (start/end params) - any better solutions?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using a protocol for get_stream might work: https://stackoverflow.com/a/68392079

By setting the default values to None and None, it might realise that this can be cast nicely to a callable with no parameters?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll look into that, thanks.

@jet-tong jet-tong requested review from IsaevIlya and muddyfish June 19, 2025 09:01
@@ -1,6 +1,7 @@
## TBD

### New features
* Implement range-based S3 Reader for byte range requests, enabling efficient random read patterns (#339)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should include link to docs for readers to learn more

@@ -114,6 +114,35 @@ For example, assuming the following directory bucket name `my-test-bucket--usw2-
usw2-az1, then the URI used will look like: `s3://my-test-bucket--usw2-az1--x-s3/<PREFIX>` (**please note that the
prefix for Amazon S3 Express One Zone should end with '/'**), paired with region us-west-2.

## Reader Configurations
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd probably want to highlight whichever one we think is most user friendly

)

def _get_client(self):
if self._client is None:
reader_type_string = self._reader_config.reader_type.name.lower()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps this should be a method on ReaderConfig? So it would be self._reader_config.human_readable_type() (or similar). Currently it feels like there's a bit of breaking through the abstraction layers

)

def _get_client(self):
if self._client is None:
reader_type_string = self._reader_config.reader_type.name.lower()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar here

else:
# Range-based reader case:
data = b"".join(stream_data)
start_val = start if start is not None else 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would start or 0 work here?

data = b"".join(stream_data)
start_val = start if start is not None else 0
end_val = end if end is not None else len(data)
return iter([data[start_val:end_val]])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally we'd validate it works with multiple chunks

S3Reader(bucket, key, lambda: None, lambda: [], reader_config=reader_config)


@pytest.mark.parametrize(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could use hypothesis for this

buf = memoryview(bytearray(buf_size))
# We're able to read all the available data or the data that can be accommodated in buf
if buf_size > 0 and total_length > 0:
assert s3reader.readinto(buf) == buf_size
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this pass? What if buf_size == 20 and len(stream) == 5?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, this is for specific case if buffer size < available bytes.
Though this only just copies sequential test where it checks that readinto operation will not write size, 2 lines below.

# Readinto operation does write size
assert s3reader._size == total_length
# confirm that read data is the same as in source
assert buf[:buf_size] == (b"".join(stream))[:buf_size]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Unnecessary brackets

@@ -114,6 +114,35 @@ For example, assuming the following directory bucket name `my-test-bucket--usw2-
usw2-az1, then the URI used will look like: `s3://my-test-bucket--usw2-az1--x-s3/<PREFIX>` (**please note that the
prefix for Amazon S3 Express One Zone should end with '/'**), paired with region us-west-2.

## Reader Configurations
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this section we want to focus on the difference in using S3Client.get_object when we need to download entire object vs download only part of object. Sample with S3MapDataset could be kept at the end of that section.

view = memoryview(buf)

# Get stream for specified byte range
self._stream = self._get_stream(start, end)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer refactoring here. I don't think there would be an performance impact, but rather I'm worried that logic for determining correct range could drift apart.

def read(self, size: Optional[int] = None) -> bytes:
"""Read up to size bytes from the current position.

If size is zero or positive, read that many bytes from S3, or until the end of the object.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it be until the end of the range?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The range is determined by size, so:

  • if size >= 0, read 'size' bytes from S3 or till end of object
  • if size < 0 or None, read from self._position till end of range (Or do we add functionality for it to read entire object instead?)

@@ -114,6 +114,35 @@ For example, assuming the following directory bucket name `my-test-bucket--usw2-
usw2-az1, then the URI used will look like: `s3://my-test-bucket--usw2-az1--x-s3/<PREFIX>` (**please note that the
prefix for Amazon S3 Express One Zone should end with '/'**), paired with region us-west-2.

## Reader Configurations
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets also mention, that readers shouldn't be shared between different threads, as their are not a thread safe.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request python Pull requests that update Python code rust Pull requests that update Rust code
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants