Skip to content

Commit 1f1d0f6

Browse files
authored
Merge pull request #72 from coreweave/eta/stream-upload-context
fix(stream_io): Finalize temporary files correctly on `__exit__`
2 parents 855c504 + 483dd5c commit 1f1d0f6

File tree

4 files changed

+54
-18
lines changed

4 files changed

+54
-18
lines changed

CHANGELOG.md

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,21 @@ All notable changes to this project will be documented in this file.
55
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
66
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
77

8+
## [Unreleased]
9+
10+
### Fixed
11+
12+
- File objects opened with `stream_io.open_stream("s3://...", "wb")` for writing
13+
to object storage now correctly upload their content when closed implicitly
14+
at the end of a `with` block, without requiring an explicit call to their
15+
`.close()` method
16+
- Since `TensorSerializer` objects already call `.close()` explicitly on
17+
their output file objects, either when `TensorSerializer.close()` is invoked
18+
or when the `TensorSerializer` is garbage collected, this bug mainly applies
19+
to manual usage of `stream_io.open_stream()` for object storage uploads
20+
not involving a `TensorSerializer`
21+
22+
823
## [2.7.1] - 2023-12-06
924

1025
### Fixed
@@ -265,7 +280,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
265280
- `get_gpu_name`
266281
- `no_init_or_tensor`
267282

268-
[2.7.0]: https://github.com/coreweave/tensorizer/compare/v2.7.0...v2.7.1
283+
[Unreleased]: https://github.com/coreweave/tensorizer/compare/v2.7.1...HEAD
284+
[2.7.1]: https://github.com/coreweave/tensorizer/compare/v2.7.0...v2.7.1
269285
[2.7.0]: https://github.com/coreweave/tensorizer/compare/v2.6.0...v2.7.0
270286
[2.6.0]: https://github.com/coreweave/tensorizer/compare/v2.5.1...v2.6.0
271287
[2.5.1]: https://github.com/coreweave/tensorizer/compare/v2.5.0...v2.5.1

tensorizer/_version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = "2.7.1"
1+
__version__ = "2.7.2"

tensorizer/stream_io.py

Lines changed: 35 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1053,9 +1053,9 @@ def _infer_credentials(
10531053
def _temp_file_closer(file: io.IOBase, file_name: str, *upload_args):
10541054
"""
10551055
Close, upload by name, and then delete the file.
1056-
Meant to replace .close() on a particular instance
1057-
of a temporary file-like wrapper object, as an unbound
1058-
callback to a weakref.finalize() registration on the wrapper.
1056+
Meant to be placed as a hook before both .close() and .__exit__()
1057+
on a particular instance of a temporary file-like wrapper object,
1058+
as a callback to a weakref.finalize() registration on the wrapper.
10591059
10601060
The reason this implementation is necessary is really complicated.
10611061
@@ -1077,17 +1077,6 @@ def _temp_file_closer(file: io.IOBase, file_name: str, *upload_args):
10771077
so they have to buffer it all in memory.
10781078
"""
10791079

1080-
if file.closed:
1081-
# Makes closure idempotent.
1082-
1083-
# If the file object is used as a context
1084-
# manager, close() is called twice (once in the
1085-
# serializer code, once after, when leaving the
1086-
# context).
1087-
1088-
# Without this check, this would trigger two
1089-
# separate uploads.
1090-
return
10911080
try:
10921081
file.close()
10931082
s3_upload(file_name, *upload_args)
@@ -1281,6 +1270,9 @@ def open_stream(
12811270
# with primitive temporary file support (e.g. Windows)
12821271
temp_file = tempfile.NamedTemporaryFile(mode="wb+", delete=False)
12831272

1273+
# Attach a callback to upload the temporary file when it closes.
1274+
# weakref finalizers are idempotent, so this upload callback
1275+
# is guaranteed to run at most once.
12841276
guaranteed_closer = weakref.finalize(
12851277
temp_file,
12861278
_temp_file_closer,
@@ -1291,7 +1283,35 @@ def open_stream(
12911283
s3_secret_access_key,
12921284
s3_endpoint,
12931285
)
1294-
temp_file.close = guaranteed_closer
1286+
1287+
# Always run the close + upload procedure
1288+
# before any code from Python's NamedTemporaryFile wrapper.
1289+
# It isn't safe to call a bound method from a weakref finalizer,
1290+
# but calling a weakref finalizer alongside a bound method
1291+
# creates no problems, other than that the code outside the
1292+
# finalizer is not guaranteed to be run at any point.
1293+
# In this case, the weakref finalizer performs all necessary
1294+
# cleanup itself, but the original NamedTemporaryFile methods
1295+
# are invoked as well, just in case.
1296+
wrapped_close = temp_file.close
1297+
1298+
def close_wrapper():
1299+
guaranteed_closer()
1300+
return wrapped_close()
1301+
1302+
# Python 3.12+ doesn't call NamedTemporaryFile.close() during
1303+
# .__exit__(), so it must be wrapped separately.
1304+
# Since guaranteed_closer is idempotent, it's fine to call it in
1305+
# both methods, even if both are called back-to-back.
1306+
wrapped_exit = temp_file.__exit__
1307+
1308+
def exit_wrapper(exc, value, tb):
1309+
guaranteed_closer()
1310+
return wrapped_exit(exc, value, tb)
1311+
1312+
temp_file.close = close_wrapper
1313+
temp_file.__exit__ = exit_wrapper
1314+
12951315
return temp_file
12961316
else:
12971317
s3_endpoint = s3_endpoint or default_s3_read_endpoint

tests/requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
transformers>=4.27.1
2-
moto[s3,server]>=4.1.4
2+
moto[s3,server]>=4.1.4,<5.0.0
33
redis>=5.0.0
44
hiredis>=2.2.0

0 commit comments

Comments
 (0)