Skip to content

Commit 2687536

Browse files
shoyerpre-commit-ci[bot]dcherianIllviljan
authored
Cache files for different CachingFileManager objects separately (#4879)
* Cache files for different CachingFileManager objects separately This means that explicitly opening a file multiple times with ``open_dataset`` (e.g., after modifying it on disk) now reopens the file from scratch, rather than reusing a cached version. If users want to reuse the cached file, they can reuse the same xarray object. We don't need this for handling many files in Dask (the original motivation for caching), because in those cases only a single CachingFileManager is created. I think this should some long-standing usability issues: #4240, #4862 Conveniently, this also obviates the need for some messy reference counting logic. * Fix whats-new message location * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Add id to CachingFileManager * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * restrict new test to only netCDF files * fix whats-new message * skip test on windows * Revert "[pre-commit.ci] auto fixes from pre-commit.com hooks" This reverts commit e637165. * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Revert "Fix whats-new message location" This reverts commit 6bc80e7. * fixups * fix syntax * tweaks * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * fix types for mypy * add uuid * restore ref_counts * doc tweaks * close files inside test_open_mfdataset_list_attr * remove unused itertools * don't use refcounts * re-enable ref counting * cleanup * Apply typing suggestions from code review Co-authored-by: Illviljan <14371165+Illviljan@users.noreply.github.com> * fix import of Hashable * ignore __init__ type * fix whats-new Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: Deepak Cherian <dcherian@users.noreply.github.com> Co-authored-by: Illviljan <14371165+Illviljan@users.noreply.github.com> Co-authored-by: dcherian <deepak@cherian.net>
1 parent 9df2dfc commit 2687536

File tree

4 files changed

+150
-71
lines changed

4 files changed

+150
-71
lines changed

doc/whats-new.rst

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,10 @@ Deprecations
4343
Bug fixes
4444
~~~~~~~~~
4545

46+
- Explicitly opening a file multiple times (e.g., after modifying it on disk)
47+
now reopens the file from scratch for h5netcdf and scipy netCDF backends,
48+
rather than reusing a cached version (:issue:`4240`, :issue:`4862`).
49+
By `Stephan Hoyer <https://github.com/shoyer>`_.
4650

4751
Documentation
4852
~~~~~~~~~~~~~

xarray/backends/file_manager.py

Lines changed: 45 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -3,21 +3,21 @@
33
import contextlib
44
import io
55
import threading
6+
import uuid
67
import warnings
7-
from typing import Any
8+
from typing import Any, Hashable
89

910
from ..core import utils
1011
from ..core.options import OPTIONS
1112
from .locks import acquire
1213
from .lru_cache import LRUCache
1314

1415
# Global cache for storing open files.
15-
FILE_CACHE: LRUCache[str, io.IOBase] = LRUCache(
16+
FILE_CACHE: LRUCache[Any, io.IOBase] = LRUCache(
1617
maxsize=OPTIONS["file_cache_maxsize"], on_evict=lambda k, v: v.close()
1718
)
1819
assert FILE_CACHE.maxsize, "file cache must be at least size one"
1920

20-
2121
REF_COUNTS: dict[Any, int] = {}
2222

2323
_DEFAULT_MODE = utils.ReprObject("<unused>")
@@ -85,12 +85,13 @@ def __init__(
8585
kwargs=None,
8686
lock=None,
8787
cache=None,
88+
manager_id: Hashable | None = None,
8889
ref_counts=None,
8990
):
90-
"""Initialize a FileManager.
91+
"""Initialize a CachingFileManager.
9192
92-
The cache and ref_counts arguments exist solely to facilitate
93-
dependency injection, and should only be set for tests.
93+
The cache, manager_id and ref_counts arguments exist solely to
94+
facilitate dependency injection, and should only be set for tests.
9495
9596
Parameters
9697
----------
@@ -120,6 +121,8 @@ def __init__(
120121
global variable and contains non-picklable file objects, an
121122
unpickled FileManager objects will be restored with the default
122123
cache.
124+
manager_id : hashable, optional
125+
Identifier for this CachingFileManager.
123126
ref_counts : dict, optional
124127
Optional dict to use for keeping track the number of references to
125128
the same file.
@@ -129,13 +132,17 @@ def __init__(
129132
self._mode = mode
130133
self._kwargs = {} if kwargs is None else dict(kwargs)
131134

132-
self._default_lock = lock is None or lock is False
133-
self._lock = threading.Lock() if self._default_lock else lock
135+
self._use_default_lock = lock is None or lock is False
136+
self._lock = threading.Lock() if self._use_default_lock else lock
134137

135138
# cache[self._key] stores the file associated with this object.
136139
if cache is None:
137140
cache = FILE_CACHE
138141
self._cache = cache
142+
if manager_id is None:
143+
# Each call to CachingFileManager should separately open files.
144+
manager_id = str(uuid.uuid4())
145+
self._manager_id = manager_id
139146
self._key = self._make_key()
140147

141148
# ref_counts[self._key] stores the number of CachingFileManager objects
@@ -153,6 +160,7 @@ def _make_key(self):
153160
self._args,
154161
"a" if self._mode == "w" else self._mode,
155162
tuple(sorted(self._kwargs.items())),
163+
self._manager_id,
156164
)
157165
return _HashedSequence(value)
158166

@@ -223,20 +231,14 @@ def close(self, needs_lock=True):
223231
if file is not None:
224232
file.close()
225233

226-
def __del__(self):
227-
# If we're the only CachingFileManger referencing a unclosed file, we
228-
# should remove it from the cache upon garbage collection.
234+
def __del__(self) -> None:
235+
# If we're the only CachingFileManger referencing a unclosed file,
236+
# remove it from the cache upon garbage collection.
229237
#
230-
# Keeping our own count of file references might seem like overkill,
231-
# but it's actually pretty common to reopen files with the same
232-
# variable name in a notebook or command line environment, e.g., to
233-
# fix the parameters used when opening a file:
234-
# >>> ds = xarray.open_dataset('myfile.nc')
235-
# >>> ds = xarray.open_dataset('myfile.nc', decode_times=False)
236-
# This second assignment to "ds" drops CPython's ref-count on the first
237-
# "ds" argument to zero, which can trigger garbage collections. So if
238-
# we didn't check whether another object is referencing 'myfile.nc',
239-
# the newly opened file would actually be immediately closed!
238+
# We keep track of our own reference count because we don't want to
239+
# close files if another identical file manager needs it. This can
240+
# happen if a CachingFileManager is pickled and unpickled without
241+
# closing the original file.
240242
ref_count = self._ref_counter.decrement(self._key)
241243

242244
if not ref_count and self._key in self._cache:
@@ -249,30 +251,40 @@ def __del__(self):
249251

250252
if OPTIONS["warn_for_unclosed_files"]:
251253
warnings.warn(
252-
"deallocating {}, but file is not already closed. "
253-
"This may indicate a bug.".format(self),
254+
f"deallocating {self}, but file is not already closed. "
255+
"This may indicate a bug.",
254256
RuntimeWarning,
255257
stacklevel=2,
256258
)
257259

258260
def __getstate__(self):
259261
"""State for pickling."""
260-
# cache and ref_counts are intentionally omitted: we don't want to try
261-
# to serialize these global objects.
262-
lock = None if self._default_lock else self._lock
263-
return (self._opener, self._args, self._mode, self._kwargs, lock)
262+
# cache is intentionally omitted: we don't want to try to serialize
263+
# these global objects.
264+
lock = None if self._use_default_lock else self._lock
265+
return (
266+
self._opener,
267+
self._args,
268+
self._mode,
269+
self._kwargs,
270+
lock,
271+
self._manager_id,
272+
)
264273

265-
def __setstate__(self, state):
274+
def __setstate__(self, state) -> None:
266275
"""Restore from a pickle."""
267-
opener, args, mode, kwargs, lock = state
268-
self.__init__(opener, *args, mode=mode, kwargs=kwargs, lock=lock)
276+
opener, args, mode, kwargs, lock, manager_id = state
277+
self.__init__( # type: ignore
278+
opener, *args, mode=mode, kwargs=kwargs, lock=lock, manager_id=manager_id
279+
)
269280

270-
def __repr__(self):
281+
def __repr__(self) -> str:
271282
args_string = ", ".join(map(repr, self._args))
272283
if self._mode is not _DEFAULT_MODE:
273284
args_string += f", mode={self._mode!r}"
274-
return "{}({!r}, {}, kwargs={})".format(
275-
type(self).__name__, self._opener, args_string, self._kwargs
285+
return (
286+
f"{type(self).__name__}({self._opener!r}, {args_string}, "
287+
f"kwargs={self._kwargs}, manager_id={self._manager_id!r})"
276288
)
277289

278290

xarray/tests/test_backends.py

Lines changed: 52 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1207,6 +1207,39 @@ def test_multiindex_not_implemented(self) -> None:
12071207
pass
12081208

12091209

1210+
class NetCDFBase(CFEncodedBase):
1211+
"""Tests for all netCDF3 and netCDF4 backends."""
1212+
1213+
@pytest.mark.skipif(
1214+
ON_WINDOWS, reason="Windows does not allow modifying open files"
1215+
)
1216+
def test_refresh_from_disk(self) -> None:
1217+
# regression test for https://github.com/pydata/xarray/issues/4862
1218+
1219+
with create_tmp_file() as example_1_path:
1220+
with create_tmp_file() as example_1_modified_path:
1221+
1222+
with open_example_dataset("example_1.nc") as example_1:
1223+
self.save(example_1, example_1_path)
1224+
1225+
example_1.rh.values += 100
1226+
self.save(example_1, example_1_modified_path)
1227+
1228+
a = open_dataset(example_1_path, engine=self.engine).load()
1229+
1230+
# Simulate external process modifying example_1.nc while this script is running
1231+
shutil.copy(example_1_modified_path, example_1_path)
1232+
1233+
# Reopen example_1.nc (modified) as `b`; note that `a` has NOT been closed
1234+
b = open_dataset(example_1_path, engine=self.engine).load()
1235+
1236+
try:
1237+
assert not np.array_equal(a.rh.values, b.rh.values)
1238+
finally:
1239+
a.close()
1240+
b.close()
1241+
1242+
12101243
_counter = itertools.count()
12111244

12121245

@@ -1238,7 +1271,7 @@ def create_tmp_files(
12381271
yield files
12391272

12401273

1241-
class NetCDF4Base(CFEncodedBase):
1274+
class NetCDF4Base(NetCDFBase):
12421275
"""Tests for both netCDF4-python and h5netcdf."""
12431276

12441277
engine: T_NetcdfEngine = "netcdf4"
@@ -1595,6 +1628,10 @@ def test_setncattr_string(self) -> None:
15951628
assert_array_equal(one_element_list_of_strings, totest.attrs["bar"])
15961629
assert one_string == totest.attrs["baz"]
15971630

1631+
@pytest.mark.skip(reason="https://github.com/Unidata/netcdf4-python/issues/1195")
1632+
def test_refresh_from_disk(self) -> None:
1633+
super().test_refresh_from_disk()
1634+
15981635

15991636
@requires_netCDF4
16001637
class TestNetCDF4AlreadyOpen:
@@ -3182,20 +3219,20 @@ def test_open_mfdataset_list_attr() -> None:
31823219

31833220
with create_tmp_files(2) as nfiles:
31843221
for i in range(2):
3185-
f = Dataset(nfiles[i], "w")
3186-
f.createDimension("x", 3)
3187-
vlvar = f.createVariable("test_var", np.int32, ("x"))
3188-
# here create an attribute as a list
3189-
vlvar.test_attr = [f"string a {i}", f"string b {i}"]
3190-
vlvar[:] = np.arange(3)
3191-
f.close()
3192-
ds1 = open_dataset(nfiles[0])
3193-
ds2 = open_dataset(nfiles[1])
3194-
original = xr.concat([ds1, ds2], dim="x")
3195-
with xr.open_mfdataset(
3196-
[nfiles[0], nfiles[1]], combine="nested", concat_dim="x"
3197-
) as actual:
3198-
assert_identical(actual, original)
3222+
with Dataset(nfiles[i], "w") as f:
3223+
f.createDimension("x", 3)
3224+
vlvar = f.createVariable("test_var", np.int32, ("x"))
3225+
# here create an attribute as a list
3226+
vlvar.test_attr = [f"string a {i}", f"string b {i}"]
3227+
vlvar[:] = np.arange(3)
3228+
3229+
with open_dataset(nfiles[0]) as ds1:
3230+
with open_dataset(nfiles[1]) as ds2:
3231+
original = xr.concat([ds1, ds2], dim="x")
3232+
with xr.open_mfdataset(
3233+
[nfiles[0], nfiles[1]], combine="nested", concat_dim="x"
3234+
) as actual:
3235+
assert_identical(actual, original)
31993236

32003237

32013238
@requires_scipy_or_netCDF4

xarray/tests/test_backends_file_manager.py

Lines changed: 49 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import pytest
99

10+
# from xarray.backends import file_manager
1011
from xarray.backends.file_manager import CachingFileManager
1112
from xarray.backends.lru_cache import LRUCache
1213
from xarray.core.options import set_options
@@ -89,55 +90,80 @@ def test_file_manager_repr() -> None:
8990
assert "my-file" in repr(manager)
9091

9192

92-
def test_file_manager_refcounts() -> None:
93+
def test_file_manager_cache_and_refcounts() -> None:
9394
mock_file = mock.Mock()
9495
opener = mock.Mock(spec=open, return_value=mock_file)
9596
cache: dict = {}
9697
ref_counts: dict = {}
9798

9899
manager = CachingFileManager(opener, "filename", cache=cache, ref_counts=ref_counts)
99100
assert ref_counts[manager._key] == 1
101+
102+
assert not cache
100103
manager.acquire()
101-
assert cache
104+
assert len(cache) == 1
102105

103-
manager2 = CachingFileManager(
104-
opener, "filename", cache=cache, ref_counts=ref_counts
105-
)
106-
assert cache
107-
assert manager._key == manager2._key
108-
assert ref_counts[manager._key] == 2
106+
with set_options(warn_for_unclosed_files=False):
107+
del manager
108+
gc.collect()
109+
110+
assert not ref_counts
111+
assert not cache
112+
113+
114+
def test_file_manager_cache_repeated_open() -> None:
115+
mock_file = mock.Mock()
116+
opener = mock.Mock(spec=open, return_value=mock_file)
117+
cache: dict = {}
118+
119+
manager = CachingFileManager(opener, "filename", cache=cache)
120+
manager.acquire()
121+
assert len(cache) == 1
122+
123+
manager2 = CachingFileManager(opener, "filename", cache=cache)
124+
manager2.acquire()
125+
assert len(cache) == 2
109126

110127
with set_options(warn_for_unclosed_files=False):
111128
del manager
112129
gc.collect()
113130

114-
assert cache
115-
assert ref_counts[manager2._key] == 1
116-
mock_file.close.assert_not_called()
131+
assert len(cache) == 1
117132

118133
with set_options(warn_for_unclosed_files=False):
119134
del manager2
120135
gc.collect()
121136

122-
assert not ref_counts
123137
assert not cache
124138

125139

126-
def test_file_manager_replace_object() -> None:
127-
opener = mock.Mock()
140+
def test_file_manager_cache_with_pickle(tmpdir) -> None:
141+
142+
path = str(tmpdir.join("testing.txt"))
143+
with open(path, "w") as f:
144+
f.write("data")
128145
cache: dict = {}
129-
ref_counts: dict = {}
130146

131-
manager = CachingFileManager(opener, "filename", cache=cache, ref_counts=ref_counts)
132-
manager.acquire()
133-
assert ref_counts[manager._key] == 1
134-
assert cache
147+
with mock.patch("xarray.backends.file_manager.FILE_CACHE", cache):
148+
assert not cache
135149

136-
manager = CachingFileManager(opener, "filename", cache=cache, ref_counts=ref_counts)
137-
assert ref_counts[manager._key] == 1
138-
assert cache
150+
manager = CachingFileManager(open, path, mode="r")
151+
manager.acquire()
152+
assert len(cache) == 1
139153

140-
manager.close()
154+
manager2 = pickle.loads(pickle.dumps(manager))
155+
manager2.acquire()
156+
assert len(cache) == 1
157+
158+
with set_options(warn_for_unclosed_files=False):
159+
del manager
160+
gc.collect()
161+
# assert len(cache) == 1
162+
163+
with set_options(warn_for_unclosed_files=False):
164+
del manager2
165+
gc.collect()
166+
assert not cache
141167

142168

143169
def test_file_manager_write_consecutive(tmpdir, file_cache) -> None:

0 commit comments

Comments
 (0)