Open
Description
Version:
- redis-py: 4.5.4
- redis: 6.0.2
- python 3.11.3
Platform:
- MAC
Description:
redis.py
import redis.asyncio as aioredis
pool = aioredis.ConnectionPool(
host=url.hostname, # type: ignore
port=url.port, # type: ignore
username=url.username,
password=url.password,
connection_class=aioredis.SSLConnection,
ssl_cert_reqs=None,
)
def get_connection_async_redis():
"""get a new connection instance for redis (for async framework)"""
try:
return aioredis.Redis(
connection_pool = pool,
auto_close_connection_pool=False
)
except IOError:
_LOG.exception("Error when connecting to redis...")
cahce.py:
async def clear_cache(pattern: str) -> None:
"""Clear cache whose key matches the pattern"""
conn = get_connection_async_redis()
for k in await conn.keys():
if pattern in k:
await conn.delete(k)
await conn.close()
unit_test.py:
from unittest import IsolatedAsyncioTestCase, main, mock
class TestPublicHolidaysDataRetrieval(IsolatedAsyncioTestCase):
async def asyncSetUp(self) -> None:
await clear_cache("key")
async def test_func_1(self) -> None:
...
async def test_func_2(self) -> None:
...
if __name__ == "__main__":
main()
When you run the unit test (please make sure you have multiple functions need to be tested), it will throw out the error:
...got Future <Future pending created at /usr/local/lib/python3.11/asyncio/base_events.py:427> attached to a different loop
...
File "/usr/local/lib/python3.11/asyncio/base_events.py", line 519, in _check_closed
raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed