Skip to content

Commit 9b7ba4d

Browse files
authored
Merge pull request #189 from coreweave/eta/object-storage
feat(stream_io): Improve object storage config handling
2 parents 9241bc8 + be337a0 commit 9b7ba4d

File tree

4 files changed

+238
-103
lines changed

4 files changed

+238
-103
lines changed

CHANGELOG.md

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,28 @@ All notable changes to this project will be documented in this file.
55
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
66
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
77

8+
[Unreleased]
9+
10+
### Added
11+
12+
- `stream_io.open_stream()` now respects Boto3's configuration files
13+
and environment variables when searching for object storage credentials to use
14+
15+
### Fixed
16+
17+
- `stream_io.open_stream()` now uses virtual-hosted-style
18+
bucket addressing for the `cwobject.com` and `cwlota.com` endpoints
19+
- `stream_io.open_stream()` now allows the `use_https` entry of `.s3cfg`
20+
configuration files to fill in its `force_http` parameter if `force_http` is
21+
not explicitly specified as `True` or `False`
22+
- `TensorSerializer` no longer throws an error when attempting to serialize
23+
very large tensors on some non-Linux platforms
24+
- Object storage uploads managed by `stream_io.open_stream()` now finalize
25+
correctly on Python 3.12+ even without an explicit call to their `close()`
26+
method
27+
- A fix for this was originally implemented in release 2.7.2,
28+
but it only worked for Python versions below 3.12
29+
830
[2.9.3] - 2025-05-09
931

1032
### Changed
@@ -424,6 +446,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
424446
- `get_gpu_name`
425447
- `no_init_or_tensor`
426448

449+
[Unreleased]: https://github.com/coreweave/tensorizer/compare/v2.9.3...HEAD
427450
[2.9.3]: https://github.com/coreweave/tensorizer/compare/v2.9.2...v2.9.3
428451
[2.9.2]: https://github.com/coreweave/tensorizer/compare/v2.9.1...v2.9.2
429452
[2.9.1]: https://github.com/coreweave/tensorizer/compare/v2.9.0...v2.9.1

tensorizer/serialization.py

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import queue
2222
import stat
2323
import struct
24+
import sys
2425
import threading
2526
import time
2627
import types
@@ -3506,16 +3507,33 @@ def _pwrite(
35063507
raise RuntimeError("pwrite was called before being initialized")
35073508

35083509
@staticmethod
3509-
def _mv_suffix(data: "collections.abc.Buffer", start: int):
3510+
def _mv_slice(data: "collections.abc.Buffer", s: slice):
35103511
if not isinstance(data, memoryview):
35113512
data = memoryview(data)
35123513
try:
35133514
if data.ndim != 1 or data.format != "B":
35143515
data = data.cast("B")
3515-
return data[start:]
3516+
return data[s]
35163517
finally:
35173518
del data
35183519

3520+
if sys.platform == "linux":
3521+
_pwrite_compat = staticmethod(os.pwrite)
3522+
else:
3523+
3524+
@staticmethod
3525+
def _pwrite_compat(_fd: int, _str, _offset: int, /) -> int:
3526+
# Some systems error on single I/O calls larger than the maximum
3527+
# value of a signed 32-bit integer, so limit os.pwrite calls
3528+
# to a maximum size of about one memory page less than that
3529+
MAX_LEN: typing.Final[int] = 2147479552
3530+
3531+
if TensorSerializer._buffer_size(_str) > MAX_LEN:
3532+
with TensorSerializer._mv_slice(_str, slice(MAX_LEN)) as mv:
3533+
return os.pwrite(_fd, mv, _offset)
3534+
3535+
return os.pwrite(_fd, _str, _offset)
3536+
35193537
def _pwrite_syscall(
35203538
self, data, offset: int, verify: Union[bool, int] = True
35213539
) -> int:
@@ -3525,14 +3543,14 @@ def _pwrite_syscall(
35253543
expected_bytes_written: int = (
35263544
verify if isinstance(verify, int) else self._buffer_size(data)
35273545
)
3528-
bytes_just_written: int = os.pwrite(self._fd, data, offset)
3546+
bytes_just_written: int = self._pwrite_compat(self._fd, data, offset)
35293547
if bytes_just_written > 0:
35303548
bytes_written += bytes_just_written
35313549
while bytes_written < expected_bytes_written and bytes_just_written > 0:
35323550
# Writes larger than ~2 GiB may not complete in a single pwrite call
35333551
offset += bytes_just_written
3534-
with self._mv_suffix(data, bytes_written) as mv:
3535-
bytes_just_written = os.pwrite(self._fd, mv, offset)
3552+
with self._mv_slice(data, slice(bytes_written, None)) as mv:
3553+
bytes_just_written = self._pwrite_compat(self._fd, mv, offset)
35363554
if bytes_just_written > 0:
35373555
bytes_written += bytes_just_written
35383556
if isinstance(verify, int) or verify:
@@ -3553,7 +3571,7 @@ def _write(self, data, expected_bytes_written: Optional[int] = None) -> int:
35533571
if bytes_just_written > expected_bytes_written:
35543572
raise ValueError("Wrote more data than expected")
35553573
while bytes_written < expected_bytes_written and bytes_just_written > 0:
3556-
with self._mv_suffix(data, bytes_written) as mv:
3574+
with self._mv_slice(data, slice(bytes_written, None)) as mv:
35573575
bytes_just_written = self._file.write(mv)
35583576
bytes_written += bytes_just_written
35593577
return bytes_written

0 commit comments

Comments
 (0)