Skip to content

Commit 8849642

Browse files
committed
fix(stream_io): Finalize temporary files correctly on __exit__
The temporary file finalizer's check that the file is not already closed was causing the upload to be skipped if the stream was used as a context manager without an explicit call to .close(), because CPython's implementation of .__exit__() for NamedTemporaryFile objects closes the underlying file before calling the wrapper's own .close() method. After changing to `weakref.finalize` in commit 3fd7f1e, uploading temporary files became inherently idempotent, so the check that the file is not already closed is no longer necessary anyway. This change deletes that check. Furthermore, changes to the NamedTemporaryFile implementation with the addition of its delete_on_close parameter in Python 3.12 (python/cpython#97015) make its .__exit__() method no longer call its .close() method at all, so this change also embeds the finalizer as a hook in a wrapper around both .close() and .__exit__() separately.
1 parent c46ed35 commit 8849642

File tree

3 files changed

+53
-17
lines changed

3 files changed

+53
-17
lines changed

CHANGELOG.md

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,21 @@ All notable changes to this project will be documented in this file.
55
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
66
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
77

8+
## [Unreleased]
9+
10+
### Fixed
11+
12+
- File objects opened with `stream_io.open_stream("s3://...", "wb")` for writing
13+
to object storage now correctly upload their content when closed implicitly
14+
at the end of a `with` block, without requiring an explicit call to their
15+
`.close()` method
16+
- Since `TensorSerializer` objects already call `.close()` explicitly on
17+
their output file objects, either when `TensorSerializer.close()` is invoked
18+
or when the `TensorSerializer` is garbage collected, this bug mainly applies
19+
to manual usage of `stream_io.open_stream()` for object storage uploads
20+
not involving a `TensorSerializer`
21+
22+
823
## [2.7.1] - 2023-12-06
924

1025
### Fixed
@@ -265,7 +280,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
265280
- `get_gpu_name`
266281
- `no_init_or_tensor`
267282

268-
[2.7.0]: https://github.com/coreweave/tensorizer/compare/v2.7.0...v2.7.1
283+
[Unreleased]: https://github.com/coreweave/tensorizer/compare/v2.7.1...HEAD
284+
[2.7.1]: https://github.com/coreweave/tensorizer/compare/v2.7.0...v2.7.1
269285
[2.7.0]: https://github.com/coreweave/tensorizer/compare/v2.6.0...v2.7.0
270286
[2.6.0]: https://github.com/coreweave/tensorizer/compare/v2.5.1...v2.6.0
271287
[2.5.1]: https://github.com/coreweave/tensorizer/compare/v2.5.0...v2.5.1

tensorizer/_version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = "2.7.1"
1+
__version__ = "2.7.2"

tensorizer/stream_io.py

Lines changed: 35 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1053,9 +1053,9 @@ def _infer_credentials(
10531053
def _temp_file_closer(file: io.IOBase, file_name: str, *upload_args):
10541054
"""
10551055
Close, upload by name, and then delete the file.
1056-
Meant to replace .close() on a particular instance
1057-
of a temporary file-like wrapper object, as an unbound
1058-
callback to a weakref.finalize() registration on the wrapper.
1056+
Meant to be placed as a hook before both .close() and .__exit__()
1057+
on a particular instance of a temporary file-like wrapper object,
1058+
as a callback to a weakref.finalize() registration on the wrapper.
10591059
10601060
The reason this implementation is necessary is really complicated.
10611061
@@ -1077,17 +1077,6 @@ def _temp_file_closer(file: io.IOBase, file_name: str, *upload_args):
10771077
so they have to buffer it all in memory.
10781078
"""
10791079

1080-
if file.closed:
1081-
# Makes closure idempotent.
1082-
1083-
# If the file object is used as a context
1084-
# manager, close() is called twice (once in the
1085-
# serializer code, once after, when leaving the
1086-
# context).
1087-
1088-
# Without this check, this would trigger two
1089-
# separate uploads.
1090-
return
10911080
try:
10921081
file.close()
10931082
s3_upload(file_name, *upload_args)
@@ -1281,6 +1270,9 @@ def open_stream(
12811270
# with primitive temporary file support (e.g. Windows)
12821271
temp_file = tempfile.NamedTemporaryFile(mode="wb+", delete=False)
12831272

1273+
# Attach a callback to upload the temporary file when it closes.
1274+
# weakref finalizers are idempotent, so this upload callback
1275+
# is guaranteed to run at most once.
12841276
guaranteed_closer = weakref.finalize(
12851277
temp_file,
12861278
_temp_file_closer,
@@ -1291,7 +1283,35 @@ def open_stream(
12911283
s3_secret_access_key,
12921284
s3_endpoint,
12931285
)
1294-
temp_file.close = guaranteed_closer
1286+
1287+
# Always run the close + upload procedure
1288+
# before any code from Python's NamedTemporaryFile wrapper.
1289+
# It isn't safe to call a bound method from a weakref finalizer,
1290+
# but calling a weakref finalizer alongside a bound method
1291+
# creates no problems, other than that the code outside the
1292+
# finalizer is not guaranteed to be run at any point.
1293+
# In this case, the weakref finalizer performs all necessary
1294+
# cleanup itself, but the original NamedTemporaryFile methods
1295+
# are invoked as well, just in case.
1296+
wrapped_close = temp_file.close
1297+
1298+
def close_wrapper():
1299+
guaranteed_closer()
1300+
return wrapped_close()
1301+
1302+
# Python 3.12+ doesn't call NamedTemporaryFile.close() during
1303+
# .__exit__(), so it must be wrapped separately.
1304+
# Since guaranteed_closer is idempotent, it's fine to call it in
1305+
# both methods, even if both are called back-to-back.
1306+
wrapped_exit = temp_file.__exit__
1307+
1308+
def exit_wrapper(exc, value, tb):
1309+
guaranteed_closer()
1310+
return wrapped_exit(exc, value, tb)
1311+
1312+
temp_file.close = close_wrapper
1313+
temp_file.__exit__ = exit_wrapper
1314+
12951315
return temp_file
12961316
else:
12971317
s3_endpoint = s3_endpoint or default_s3_read_endpoint

0 commit comments

Comments
 (0)