Skip to content

Commit

Permalink
replase get_event_loop wite get_running_loop (#2530)
Browse files Browse the repository at this point in the history
  • Loading branch information
dvora-h authored Jan 5, 2023
1 parent f14ed1f commit e67d15c
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 20 deletions.
4 changes: 2 additions & 2 deletions redis/asyncio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ def __del__(self, _warnings: Any = warnings) -> None:
f"Unclosed client session {self!r}", ResourceWarning, source=self
)
context = {"client": self, "message": self._DEL_MESSAGE}
asyncio.get_event_loop().call_exception_handler(context)
asyncio.get_running_loop().call_exception_handler(context)

async def close(self, close_connection_pool: Optional[bool] = None) -> None:
"""
Expand Down Expand Up @@ -798,7 +798,7 @@ async def check_health(self):

if (
conn.health_check_interval
and asyncio.get_event_loop().time() > conn.next_health_check
and asyncio.get_running_loop().time() > conn.next_health_check
):
await conn.send_command(
"PING", self.HEALTH_CHECK_MESSAGE, check_health=False
Expand Down
2 changes: 1 addition & 1 deletion redis/asyncio/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ def repr_pieces(self):
def __del__(self):
try:
if self.is_connected:
loop = asyncio.get_event_loop()
loop = asyncio.get_running_loop()
coro = self.disconnect()
if loop.is_running():
loop.create_task(coro)
Expand Down
4 changes: 2 additions & 2 deletions redis/asyncio/lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,14 +201,14 @@ async def acquire(
blocking_timeout = self.blocking_timeout
stop_trying_at = None
if blocking_timeout is not None:
stop_trying_at = asyncio.get_event_loop().time() + blocking_timeout
stop_trying_at = asyncio.get_running_loop().time() + blocking_timeout
while True:
if await self.do_acquire(token):
self.local.token = token
return True
if not blocking:
return False
next_try_at = asyncio.get_event_loop().time() + sleep
next_try_at = asyncio.get_running_loop().time() + sleep
if stop_trying_at is not None and next_try_at > stop_trying_at:
return False
await asyncio.sleep(sleep)
Expand Down
14 changes: 7 additions & 7 deletions tests/test_asyncio/test_connection_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,11 +243,11 @@ async def test_connection_pool_blocks_until_timeout(self, master_host):
) as pool:
c1 = await pool.get_connection("_")

start = asyncio.get_event_loop().time()
start = asyncio.get_running_loop().time()
with pytest.raises(redis.ConnectionError):
await pool.get_connection("_")
# we should have waited at least 0.1 seconds
assert asyncio.get_event_loop().time() - start >= 0.1
assert asyncio.get_running_loop().time() - start >= 0.1
await c1.disconnect()

async def test_connection_pool_blocks_until_conn_available(self, master_host):
Expand All @@ -265,9 +265,9 @@ async def target():
await asyncio.sleep(0.1)
await pool.release(c1)

start = asyncio.get_event_loop().time()
start = asyncio.get_running_loop().time()
await asyncio.gather(target(), pool.get_connection("_"))
assert asyncio.get_event_loop().time() - start >= 0.1
assert asyncio.get_running_loop().time() - start >= 0.1

async def test_reuse_previously_released_connection(self, master_host):
connection_kwargs = {"host": master_host}
Expand Down Expand Up @@ -668,20 +668,20 @@ async def r(self, create_redis):
await redis.flushall()

def assert_interval_advanced(self, connection):
diff = connection.next_health_check - asyncio.get_event_loop().time()
diff = connection.next_health_check - asyncio.get_running_loop().time()
assert self.interval >= diff > (self.interval - 1)

async def test_health_check_runs(self, r):
if r.connection:
r.connection.next_health_check = asyncio.get_event_loop().time() - 1
r.connection.next_health_check = asyncio.get_running_loop().time() - 1
await r.connection.check_health()
self.assert_interval_advanced(r.connection)

async def test_arbitrary_command_invokes_health_check(self, r):
# invoke a command to make sure the connection is entirely setup
if r.connection:
await r.get("foo")
r.connection.next_health_check = asyncio.get_event_loop().time()
r.connection.next_health_check = asyncio.get_running_loop().time()
with mock.patch.object(
r.connection, "send_command", wraps=r.connection.send_command
) as m:
Expand Down
4 changes: 2 additions & 2 deletions tests/test_asyncio/test_lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,11 @@ async def test_high_sleep_small_blocking_timeout(self, r):
sleep = 60
bt = 1
lock2 = self.get_lock(r, "foo", sleep=sleep, blocking_timeout=bt)
start = asyncio.get_event_loop().time()
start = asyncio.get_running_loop().time()
assert not await lock2.acquire()
# the elapsed timed is less than the blocking_timeout as the lock is
# unattainable given the sleep/blocking_timeout configuration
assert bt > (asyncio.get_event_loop().time() - start)
assert bt > (asyncio.get_running_loop().time() - start)
await lock1.release()

async def test_releasing_unlocked_lock_raises_error(self, r):
Expand Down
12 changes: 6 additions & 6 deletions tests/test_asyncio/test_pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ async def run(*args, **kwargs):


async def wait_for_message(pubsub, timeout=0.2, ignore_subscribe_messages=False):
now = asyncio.get_event_loop().time()
now = asyncio.get_running_loop().time()
timeout = now + timeout
while now < timeout:
message = await pubsub.get_message(
Expand All @@ -39,7 +39,7 @@ async def wait_for_message(pubsub, timeout=0.2, ignore_subscribe_messages=False)
if message is not None:
return message
await asyncio.sleep(0.01)
now = asyncio.get_event_loop().time()
now = asyncio.get_running_loop().time()
return None


Expand Down Expand Up @@ -675,7 +675,7 @@ async def loop_step():
await messages.put(message)
break

task = asyncio.get_event_loop().create_task(loop())
task = asyncio.get_running_loop().create_task(loop())
# get the initial connect message
async with async_timeout.timeout(1):
message = await messages.get()
Expand Down Expand Up @@ -724,7 +724,7 @@ def callback(message):
messages = asyncio.Queue()
p = pubsub
await self._subscribe(p, foo=callback)
task = asyncio.get_event_loop().create_task(p.run())
task = asyncio.get_running_loop().create_task(p.run())
await r.publish("foo", "bar")
message = await messages.get()
task.cancel()
Expand All @@ -748,7 +748,7 @@ def exception_handler_callback(e, pubsub) -> None:
p = pubsub
await self._subscribe(p, foo=lambda x: None)
with mock.patch.object(p, "get_message", side_effect=Exception("error")):
task = asyncio.get_event_loop().create_task(
task = asyncio.get_running_loop().create_task(
p.run(exception_handler=exception_handler_callback)
)
e = await exceptions.get()
Expand All @@ -765,7 +765,7 @@ def callback(message):

messages = asyncio.Queue()
p = pubsub
task = asyncio.get_event_loop().create_task(p.run())
task = asyncio.get_running_loop().create_task(p.run())
# wait until loop gets settled. Add a subscription
await asyncio.sleep(0.1)
await p.subscribe(foo=callback)
Expand Down

0 comments on commit e67d15c

Please sign in to comment.