Skip to content

Commit 28eef32

Browse files
authored
COH-32325 - Fix issue with cache destroy when the stream handler has been closed (#242)
* COH-32325 - Fix issue with cache destroy when the stream handler has been closed
1 parent 5501fe6 commit 28eef32

File tree

2 files changed

+48
-6
lines changed

2 files changed

+48
-6
lines changed

src/coherence/client.py

+7-5
Original file line numberDiff line numberDiff line change
@@ -1068,7 +1068,6 @@ def on_destroyed(name: str) -> None:
10681068
if name == cache_name and not this.destroyed:
10691069
this._events_manager._close()
10701070
this._destroyed = True
1071-
this._released = True
10721071
emitter.emit(MapLifecycleEvent.DESTROYED.value, name)
10731072

10741073
# noinspection PyProtectedMember
@@ -1289,10 +1288,13 @@ async def clear(self) -> None:
12891288
await self._near_cache.clear()
12901289

12911290
async def destroy(self) -> None:
1292-
self._internal_emitter.once(MapLifecycleEvent.DESTROYED.value)
1293-
self._internal_emitter.emit(MapLifecycleEvent.DESTROYED.value, self.name)
1294-
dispatcher: Dispatcher = self._request_factory.destroy_request()
1295-
await dispatcher.dispatch(self._stream_handler)
1291+
if not self._stream_handler._closed:
1292+
dispatcher: Dispatcher = self._request_factory.destroy_request()
1293+
await dispatcher.dispatch(self._stream_handler)
1294+
# Now do everything that is done for release
1295+
self._internal_emitter.once(MapLifecycleEvent.RELEASED.value)
1296+
self._internal_emitter.emit(MapLifecycleEvent.RELEASED.value, self.name)
1297+
await self._stream_handler.close()
12961298

12971299
async def release(self) -> None:
12981300
if self.active:

tests/e2e/test_client.py

+41-1
Original file line numberDiff line numberDiff line change
@@ -495,6 +495,46 @@ def callback(n: str) -> None:
495495
await session.close()
496496

497497

498+
# noinspection PyShadowingNames,DuplicatedCode
499+
@pytest.mark.asyncio(loop_scope="function")
500+
async def test_cache_destroy_event() -> None:
501+
session: Session = await tests.get_session()
502+
cache: NamedCache[str, str] = await session.get_cache("test-" + str(int(time() * 1000)))
503+
name: str = "UNSET"
504+
destroy_event: Event = Event()
505+
release_event: Event = Event()
506+
507+
def destroy_callback(n: str) -> None:
508+
nonlocal name
509+
name = n
510+
destroy_event.set()
511+
512+
cache.on(MapLifecycleEvent.DESTROYED, destroy_callback)
513+
514+
def release_callback(n: str) -> None:
515+
nonlocal name
516+
name = n
517+
release_event.set()
518+
519+
cache.on(MapLifecycleEvent.RELEASED, release_callback)
520+
521+
try:
522+
await cache.put("A", "B")
523+
await cache.put("C", "D")
524+
assert await cache.size() == 2
525+
526+
await cache.destroy()
527+
await tests.wait_for(destroy_event, EVENT_TIMEOUT)
528+
await tests.wait_for(release_event, EVENT_TIMEOUT)
529+
530+
assert name == cache.name
531+
assert cache.destroyed
532+
assert cache.released
533+
assert not cache.active
534+
finally:
535+
await session.close()
536+
537+
498538
# noinspection PyShadowingNames,DuplicatedCode,PyUnresolvedReferences
499539
@pytest.mark.asyncio
500540
async def test_add_remove_index(person_cache: NamedCache[str, Person]) -> None:
@@ -603,7 +643,7 @@ async def test_ttl_configuration(test_session: Session) -> None:
603643

604644
@pytest.mark.asyncio
605645
async def test_unary_error(test_session: Session) -> None:
606-
cache: NamedCache[str, str] = await test_session.get_cache("unary_error")
646+
cache: NamedCache[str, dict] = await test_session.get_cache("unary_error")
607647

608648
d = dict()
609649
d["@class"] = "com.foo.Bar"

0 commit comments

Comments
 (0)