Skip to content

Commit 17f425a

Browse files
committed
Implement trio.abc.AsyncResource (ie: aclose()) for all datastores
Lot's of “code change” unfortunately, since all the tests were convert to properly utilize this facility everywhere, which ended indenting just about every test line by +1.
1 parent f40f5b9 commit 17f425a

11 files changed

+578
-453
lines changed

datastore/adapter/_support.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import typing
22

3+
import trio
4+
35
import datastore
46

57
T_co = typing.TypeVar("T_co", covariant=True)
@@ -58,3 +60,24 @@ def remove_datastore(self, store: DS) -> None:
5860
def insert_datastore(self, index: int, store: DS) -> None:
5961
"""Inserts datastore `store` into this collection at `index`."""
6062
self._stores.insert(index, store)
63+
64+
async def _stores_cleanup(self) -> None:
65+
"""Closes and removes all added datastores"""
66+
errors: typing.List[Exception] = []
67+
68+
while len(self._stores):
69+
store = self._stores.pop()
70+
71+
try:
72+
await store.aclose()
73+
except trio.Cancelled:
74+
pass # We check for cancellation later on
75+
except Exception as error:
76+
errors.append(error)
77+
78+
# Ensure error propagation
79+
if errors:
80+
raise trio.MultiError(errors)
81+
82+
# Ensure cancellation is propagated
83+
await trio.sleep(0)

datastore/adapter/sharded.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,11 @@ def _shard_query_generator(self, query):
9292

9393
if shard_query.limit <= 0:
9494
break # we're already done!
95+
96+
97+
async def aclose(self) -> None:
98+
"""Closes and removes all added datastores"""
99+
await self._stores_cleanup()
95100

96101

97102

datastore/adapter/tiered.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,11 @@ async def contains(self, key: datastore.Key) -> bool:
143143
if await store.contains(key):
144144
return True
145145
return False
146+
147+
148+
async def aclose(self) -> None:
149+
"""Closes and removes all added datastores"""
150+
await self._stores_cleanup()
146151

147152

148153
class BinaryAdapter(

datastore/core/binarystore.py

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ def is_valid_value_type(value: util.stream.ArbitraryReceiveStream) -> bool:
2929

3030

3131

32-
class Datastore:
32+
class Datastore(trio.abc.AsyncResource):
3333
"""A Datastore represents storage for any string key to binary value pair.
3434
3535
Datastores are general enough to be backed by all kinds of different storage:
@@ -166,6 +166,16 @@ async def get_all(self, key: key_.Key) -> bytes:
166166
An internal error occurred
167167
"""
168168
return await (await self.get(key)).collect()
169+
170+
171+
async def aclose(self) -> None:
172+
"""Closes this any resources held by this datastore, possibly blocking
173+
174+
Carefully read the documentation of :class:`trio.abc.AsyncResource`,
175+
particularily with regards to concellation and forceful closings, when
176+
implementating this.
177+
"""
178+
pass
169179

170180

171181

@@ -284,6 +294,12 @@ async def contains(self, key: key_.Key) -> bool:
284294

285295
def __len__(self) -> int:
286296
return sum(map(len, self._items.values()))
297+
298+
299+
async def aclose(self) -> None:
300+
"""Deletes all items from this datastore"""
301+
self._items.clear()
302+
await super().aclose()
287303

288304

289305

@@ -406,6 +422,19 @@ async def contains(self, key: key_.Key) -> bool:
406422
return await self.child_datastore.contains(key)
407423
else:
408424
return await Datastore.contains(self, key)
425+
426+
427+
async def aclose(self) -> None:
428+
"""Closes this any resources held by the child datastore
429+
430+
Carefully read the documentation of :class:`trio.abc.AsyncResource`,
431+
particularily with regards to concellation and forceful closings, when
432+
implementating this.
433+
"""
434+
try:
435+
await self.child_datastore.aclose()
436+
finally:
437+
await super().aclose()
409438

410439

411440
Datastore.ADAPTER_T = Adapter

datastore/core/objectstore.py

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ def is_valid_value_type(value: util.stream.ArbitraryReceiveChannel) -> bool:
3232
)) and not isinstance(value, (str, bytes))
3333

3434

35-
class Datastore(typing.Generic[T_co]):
35+
class Datastore(typing.Generic[T_co], trio.abc.AsyncResource):
3636
"""A Datastore represents storage for any string key to an arbitrary Python object pair.
3737
3838
Datastores are general enough to be backed by all kinds of different storage:
@@ -193,6 +193,16 @@ async def get_all(self, key: key_.Key) -> typing.List[T_co]:
193193
An internal error occurred
194194
"""
195195
return await (await self.get(key)).collect()
196+
197+
198+
async def aclose(self) -> None:
199+
"""Closes this any resources held by this datastore, possibly blocking
200+
201+
Carefully read the documentation of :class:`trio.abc.AsyncResource`,
202+
particularily with regards to concellation and forceful closings, when
203+
implementating this.
204+
"""
205+
pass
196206

197207

198208

@@ -336,6 +346,12 @@ async def query(self, query: query_.Query) -> query_.Cursor:
336346

337347
def __len__(self) -> int:
338348
return sum(map(len, self._items.values()))
349+
350+
351+
async def aclose(self) -> None:
352+
"""Deletes all items from this datastore"""
353+
self._items.clear()
354+
await super().aclose()
339355

340356

341357

@@ -483,6 +499,19 @@ async def contains(self, key: key_.Key) -> bool:
483499
return await self.child_datastore.contains(key)
484500
else:
485501
return await Datastore.contains(self, key)
502+
503+
504+
async def aclose(self) -> None:
505+
"""Closes this any resources held by the child datastore
506+
507+
Carefully read the documentation of :class:`trio.abc.AsyncResource`,
508+
particularily with regards to concellation and forceful closings, when
509+
implementating this.
510+
"""
511+
try:
512+
await self.child_datastore.aclose()
513+
finally:
514+
await super().aclose()
486515

487516

488517
Datastore.ADAPTER_T = Adapter

tests/adapter/test_directory.py

Lines changed: 89 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import contextlib
12
import logging
23
import typing
34

@@ -10,9 +11,14 @@
1011

1112
@trio.testing.trio_test
1213
async def test_dir_simple(DatastoreTests):
13-
s1 = datastore.adapter.directory.ObjectDatastore(datastore.ObjectDictDatastore())
14-
s2 = datastore.adapter.directory.ObjectDatastore(datastore.ObjectDictDatastore())
15-
await DatastoreTests([s1, s2]).subtest_simple()
14+
async with contextlib.AsyncExitStack() as stack:
15+
s1 = stack.push_async_exit(
16+
datastore.adapter.directory.ObjectDatastore(datastore.ObjectDictDatastore())
17+
)
18+
s2 = stack.push_async_exit(
19+
datastore.adapter.directory.ObjectDatastore(datastore.ObjectDictDatastore())
20+
)
21+
await DatastoreTests([s1, s2]).subtest_simple()
1622

1723

1824
##########################
@@ -29,106 +35,103 @@ class ObjectDirectoryDictDatastore(
2935

3036
@trio.testing.trio_test
3137
async def test_directory_simple(DatastoreTests):
32-
s1 = ObjectDirectoryDictDatastore()
33-
s2 = ObjectDirectoryDictDatastore()
34-
await DatastoreTests([s1, s2]).subtest_simple()
38+
async with contextlib.AsyncExitStack() as stack:
39+
s1 = stack.push_async_exit(ObjectDirectoryDictDatastore())
40+
s2 = stack.push_async_exit(ObjectDirectoryDictDatastore())
41+
await DatastoreTests([s1, s2]).subtest_simple()
3542

3643

3744
@trio.testing.trio_test
3845
async def test_directory_init():
39-
ds = ObjectDirectoryDictDatastore()
46+
async with ObjectDirectoryDictDatastore() as ds:
47+
# initialize directory at /foo
48+
dir_key = datastore.Key('/foo')
49+
await ds.directory(dir_key)
50+
assert await ds.get_all(dir_key) == []
4051

41-
# initialize directory at /foo
42-
dir_key = datastore.Key('/foo')
43-
await ds.directory(dir_key)
44-
assert await ds.get_all(dir_key) == []
52+
# can add to dir
53+
bar_key = datastore.Key('/foo/bar')
54+
await ds.directory_add(dir_key, bar_key)
55+
assert await ds.get_all(dir_key) == [str(bar_key)]
4556

46-
# can add to dir
47-
bar_key = datastore.Key('/foo/bar')
48-
await ds.directory_add(dir_key, bar_key)
49-
assert await ds.get_all(dir_key) == [str(bar_key)]
50-
51-
# re-init does not wipe out directory at /foo
52-
dir_key = datastore.Key('/foo')
53-
with pytest.raises(KeyError):
54-
await ds.directory(dir_key, exist_ok=False)
55-
await ds.directory(dir_key, exist_ok=True)
56-
assert await ds.get_all(dir_key) == [str(bar_key)]
57+
# re-init does not wipe out directory at /foo
58+
dir_key = datastore.Key('/foo')
59+
with pytest.raises(KeyError):
60+
await ds.directory(dir_key, exist_ok=False)
61+
await ds.directory(dir_key, exist_ok=True)
62+
assert await ds.get_all(dir_key) == [str(bar_key)]
5763

5864

5965
@trio.testing.trio_test
6066
async def test_directory_basic():
61-
ds = ObjectDirectoryDictDatastore()
62-
63-
# initialize directory at /foo
64-
dir_key = datastore.Key('/foo')
65-
await ds.directory(dir_key)
66-
67-
# adding directory entries
68-
bar_key = datastore.Key('/foo/bar')
69-
baz_key = datastore.Key('/foo/baz')
70-
await ds.directory_add(dir_key, bar_key)
71-
await ds.directory_add(dir_key, baz_key)
72-
keys = [key async for key in ds.directory_read(dir_key)]
73-
assert keys == [bar_key, baz_key]
74-
75-
# removing directory entries
76-
await ds.directory_remove(dir_key, bar_key)
77-
keys = [key async for key in ds.directory_read(dir_key)]
78-
assert keys == [baz_key]
79-
80-
await ds.directory_remove(dir_key, baz_key)
81-
keys = [key async for key in ds.directory_read(dir_key)]
82-
assert keys == []
83-
84-
# generator
85-
with pytest.raises(StopAsyncIteration):
86-
gen = ds.directory_read(dir_key).__aiter__()
87-
await gen.__anext__()
67+
async with ObjectDirectoryDictDatastore() as ds:
68+
# initialize directory at /foo
69+
dir_key = datastore.Key('/foo')
70+
await ds.directory(dir_key)
71+
72+
# adding directory entries
73+
bar_key = datastore.Key('/foo/bar')
74+
baz_key = datastore.Key('/foo/baz')
75+
await ds.directory_add(dir_key, bar_key)
76+
await ds.directory_add(dir_key, baz_key)
77+
keys = [key async for key in ds.directory_read(dir_key)]
78+
assert keys == [bar_key, baz_key]
79+
80+
# removing directory entries
81+
await ds.directory_remove(dir_key, bar_key)
82+
keys = [key async for key in ds.directory_read(dir_key)]
83+
assert keys == [baz_key]
84+
85+
await ds.directory_remove(dir_key, baz_key)
86+
keys = [key async for key in ds.directory_read(dir_key)]
87+
assert keys == []
88+
89+
# generator
90+
with pytest.raises(StopAsyncIteration):
91+
gen = ds.directory_read(dir_key).__aiter__()
92+
await gen.__anext__()
8893

8994

9095
@trio.testing.trio_test
9196
async def test_directory_double_add():
92-
ds = ObjectDirectoryDictDatastore()
93-
94-
# initialize directory at /foo
95-
dir_key = datastore.Key('/foo')
96-
await ds.directory(dir_key)
97+
async with ObjectDirectoryDictDatastore() as ds:
98+
# initialize directory at /foo
99+
dir_key = datastore.Key('/foo')
100+
await ds.directory(dir_key)
97101

98-
# adding directory entries
99-
bar_key = datastore.Key('/foo/bar')
100-
baz_key = datastore.Key('/foo/baz')
101-
await ds.directory_add(dir_key, bar_key)
102-
await ds.directory_add(dir_key, baz_key)
103-
await ds.directory_add(dir_key, bar_key)
104-
await ds.directory_add(dir_key, baz_key)
105-
await ds.directory_add(dir_key, baz_key)
106-
await ds.directory_add(dir_key, bar_key)
102+
# adding directory entries
103+
bar_key = datastore.Key('/foo/bar')
104+
baz_key = datastore.Key('/foo/baz')
105+
await ds.directory_add(dir_key, bar_key)
106+
await ds.directory_add(dir_key, baz_key)
107+
await ds.directory_add(dir_key, bar_key)
108+
await ds.directory_add(dir_key, baz_key)
109+
await ds.directory_add(dir_key, baz_key)
110+
await ds.directory_add(dir_key, bar_key)
107111

108-
keys = [key async for key in ds.directory_read(dir_key)]
109-
assert keys == [bar_key, baz_key]
112+
keys = [key async for key in ds.directory_read(dir_key)]
113+
assert keys == [bar_key, baz_key]
110114

111115

112116
@trio.testing.trio_test
113117
async def test_directory_remove():
114-
ds = ObjectDirectoryDictDatastore()
115-
116-
# initialize directory at /foo
117-
dir_key = datastore.Key('/foo')
118-
await ds.directory(dir_key)
119-
120-
# adding directory entries
121-
bar_key = datastore.Key('/foo/bar')
122-
baz_key = datastore.Key('/foo/baz')
123-
await ds.directory_add(dir_key, bar_key)
124-
await ds.directory_add(dir_key, baz_key)
125-
keys = [key async for key in ds.directory_read(dir_key)]
126-
assert keys == [bar_key, baz_key]
127-
128-
# removing directory entries
129-
await ds.directory_remove(dir_key, bar_key)
130-
await ds.directory_remove(dir_key, bar_key, missing_ok=True)
131-
with pytest.raises(KeyError):
132-
await ds.directory_remove(dir_key, bar_key, missing_ok=False)
133-
keys = [key async for key in ds.directory_read(dir_key)]
134-
assert keys == [baz_key]
118+
async with ObjectDirectoryDictDatastore() as ds:
119+
# initialize directory at /foo
120+
dir_key = datastore.Key('/foo')
121+
await ds.directory(dir_key)
122+
123+
# adding directory entries
124+
bar_key = datastore.Key('/foo/bar')
125+
baz_key = datastore.Key('/foo/baz')
126+
await ds.directory_add(dir_key, bar_key)
127+
await ds.directory_add(dir_key, baz_key)
128+
keys = [key async for key in ds.directory_read(dir_key)]
129+
assert keys == [bar_key, baz_key]
130+
131+
# removing directory entries
132+
await ds.directory_remove(dir_key, bar_key)
133+
await ds.directory_remove(dir_key, bar_key, missing_ok=True)
134+
with pytest.raises(KeyError):
135+
await ds.directory_remove(dir_key, bar_key, missing_ok=False)
136+
keys = [key async for key in ds.directory_read(dir_key)]
137+
assert keys == [baz_key]

0 commit comments

Comments
 (0)