Skip to content

Commit 7a1c618

Browse files
authored
chore: Deal with threading related issues that caused flakinnes (#420)
- Add lock to prevent access to already running generator - Compare sets to avoid order related non determinism caused by threads
1 parent fba5354 commit 7a1c618

File tree

1 file changed

+30
-26
lines changed

1 file changed

+30
-26
lines changed

tests/unit/test_logging.py

Lines changed: 30 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import asyncio
22
import json
33
import logging
4+
import threading
45
import time
56
from collections.abc import AsyncIterator, Generator, Iterator
67
from datetime import datetime, timedelta
@@ -65,6 +66,8 @@
6566

6667
@pytest.fixture
6768
def mock_api() -> None:
69+
test_server_lock = threading.Lock()
70+
6871
def get_responses() -> Generator[httpx.Response, None, None]:
6972
"""Simulate actor run that changes status 3 times."""
7073
for _ in range(5):
@@ -116,8 +119,11 @@ def get_responses() -> Generator[httpx.Response, None, None]:
116119
responses = get_responses()
117120

118121
def actor_runs_side_effect(_: httpx.Request) -> httpx.Response:
119-
time.sleep(0.1)
120-
return next(responses)
122+
test_server_lock.acquire()
123+
# To avoid multiple threads accessing at the same time and causing `ValueError: generator already executing`
124+
response = next(responses)
125+
test_server_lock.release_lock()
126+
return response
121127

122128
respx.get(url=f'{_MOCKED_API_URL}/v2/actor-runs/{_MOCKED_RUN_ID}').mock(side_effect=actor_runs_side_effect)
123129

@@ -212,10 +218,10 @@ async def test_redirected_logs_async(
212218
# Do stuff while the log from the other Actor is being redirected to the logs.
213219
await asyncio.sleep(2)
214220

215-
assert len(caplog.records) == expected_log_count
216-
for expected_message_and_level, record in zip(_EXPECTED_MESSAGES_AND_LEVELS[-expected_log_count:], caplog.records):
217-
assert expected_message_and_level[0] == record.message
218-
assert expected_message_and_level[1] == record.levelno
221+
# Ensure logs are propagated
222+
assert {(record.message, record.levelno) for record in caplog.records} == set(
223+
_EXPECTED_MESSAGES_AND_LEVELS[-expected_log_count:]
224+
)
219225

220226

221227
@pytest.mark.parametrize(
@@ -250,10 +256,10 @@ def test_redirected_logs_sync(
250256
# Do stuff while the log from the other Actor is being redirected to the logs.
251257
time.sleep(2)
252258

253-
assert len(caplog.records) == expected_log_count
254-
for expected_message_and_level, record in zip(_EXPECTED_MESSAGES_AND_LEVELS[-expected_log_count:], caplog.records):
255-
assert expected_message_and_level[0] == record.message
256-
assert expected_message_and_level[1] == record.levelno
259+
# Ensure logs are propagated
260+
assert {(record.message, record.levelno) for record in caplog.records} == set(
261+
_EXPECTED_MESSAGES_AND_LEVELS[-expected_log_count:]
262+
)
257263

258264

259265
@respx.mock
@@ -278,10 +284,9 @@ async def test_actor_call_redirect_logs_to_default_logger_async(
278284
assert isinstance(logger.handlers[0], logging.StreamHandler)
279285

280286
# Ensure logs are propagated
281-
assert len(caplog.records) == len(_EXPECTED_MESSAGES_AND_LEVELS_WITH_STATUS_MESSAGES)
282-
for expected_message_and_level, record in zip(_EXPECTED_MESSAGES_AND_LEVELS_WITH_STATUS_MESSAGES, caplog.records):
283-
assert expected_message_and_level[0] == record.message
284-
assert expected_message_and_level[1] == record.levelno
287+
assert {(record.message, record.levelno) for record in caplog.records} == set(
288+
_EXPECTED_MESSAGES_AND_LEVELS_WITH_STATUS_MESSAGES
289+
)
285290

286291

287292
@respx.mock
@@ -306,10 +311,9 @@ def test_actor_call_redirect_logs_to_default_logger_sync(
306311
assert isinstance(logger.handlers[0], logging.StreamHandler)
307312

308313
# Ensure logs are propagated
309-
assert len(caplog.records) == len(_EXPECTED_MESSAGES_AND_LEVELS_WITH_STATUS_MESSAGES)
310-
for expected_message_and_level, record in zip(_EXPECTED_MESSAGES_AND_LEVELS_WITH_STATUS_MESSAGES, caplog.records):
311-
assert expected_message_and_level[0] == record.message
312-
assert expected_message_and_level[1] == record.levelno
314+
assert {(record.message, record.levelno) for record in caplog.records} == set(
315+
_EXPECTED_MESSAGES_AND_LEVELS_WITH_STATUS_MESSAGES
316+
)
313317

314318

315319
@respx.mock
@@ -357,10 +361,10 @@ async def test_actor_call_redirect_logs_to_custom_logger_async(
357361
with caplog.at_level(logging.DEBUG, logger=logger_name):
358362
await actor_client.call(logger=logger)
359363

360-
assert len(caplog.records) == len(_EXPECTED_MESSAGES_AND_LEVELS_WITH_STATUS_MESSAGES)
361-
for expected_message_and_level, record in zip(_EXPECTED_MESSAGES_AND_LEVELS_WITH_STATUS_MESSAGES, caplog.records):
362-
assert expected_message_and_level[0] == record.message
363-
assert expected_message_and_level[1] == record.levelno
364+
# Ensure logs are propagated
365+
assert {(record.message, record.levelno) for record in caplog.records} == set(
366+
_EXPECTED_MESSAGES_AND_LEVELS_WITH_STATUS_MESSAGES
367+
)
364368

365369

366370
@respx.mock
@@ -378,10 +382,10 @@ def test_actor_call_redirect_logs_to_custom_logger_sync(
378382
with caplog.at_level(logging.DEBUG, logger=logger_name):
379383
actor_client.call(logger=logger)
380384

381-
assert len(caplog.records) == len(_EXPECTED_MESSAGES_AND_LEVELS_WITH_STATUS_MESSAGES)
382-
for expected_message_and_level, record in zip(_EXPECTED_MESSAGES_AND_LEVELS_WITH_STATUS_MESSAGES, caplog.records):
383-
assert expected_message_and_level[0] == record.message
384-
assert expected_message_and_level[1] == record.levelno
385+
# Ensure logs are propagated
386+
assert {(record.message, record.levelno) for record in caplog.records} == set(
387+
_EXPECTED_MESSAGES_AND_LEVELS_WITH_STATUS_MESSAGES
388+
)
385389

386390

387391
@respx.mock

0 commit comments

Comments
 (0)