Skip to content

chore: Deal with threading related issues that caused flakinnes #420

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 13, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 30 additions & 26 deletions tests/unit/test_logging.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
import json
import logging
import threading
import time
from collections.abc import AsyncIterator, Generator, Iterator
from datetime import datetime, timedelta
Expand Down Expand Up @@ -65,6 +66,8 @@

@pytest.fixture
def mock_api() -> None:
test_server_lock = threading.Lock()

def get_responses() -> Generator[httpx.Response, None, None]:
"""Simulate actor run that changes status 3 times."""
for _ in range(5):
Expand Down Expand Up @@ -116,8 +119,11 @@ def get_responses() -> Generator[httpx.Response, None, None]:
responses = get_responses()

def actor_runs_side_effect(_: httpx.Request) -> httpx.Response:
time.sleep(0.1)
return next(responses)
test_server_lock.acquire()
# To avoid multiple threads accessing at the same time and causing `ValueError: generator already executing`
response = next(responses)
test_server_lock.release_lock()
return response

respx.get(url=f'{_MOCKED_API_URL}/v2/actor-runs/{_MOCKED_RUN_ID}').mock(side_effect=actor_runs_side_effect)

Expand Down Expand Up @@ -212,10 +218,10 @@ async def test_redirected_logs_async(
# Do stuff while the log from the other Actor is being redirected to the logs.
await asyncio.sleep(2)

assert len(caplog.records) == expected_log_count
for expected_message_and_level, record in zip(_EXPECTED_MESSAGES_AND_LEVELS[-expected_log_count:], caplog.records):
assert expected_message_and_level[0] == record.message
assert expected_message_and_level[1] == record.levelno
# Ensure logs are propagated
assert {(record.message, record.levelno) for record in caplog.records} == set(
_EXPECTED_MESSAGES_AND_LEVELS[-expected_log_count:]
)


@pytest.mark.parametrize(
Expand Down Expand Up @@ -250,10 +256,10 @@ def test_redirected_logs_sync(
# Do stuff while the log from the other Actor is being redirected to the logs.
time.sleep(2)

assert len(caplog.records) == expected_log_count
for expected_message_and_level, record in zip(_EXPECTED_MESSAGES_AND_LEVELS[-expected_log_count:], caplog.records):
assert expected_message_and_level[0] == record.message
assert expected_message_and_level[1] == record.levelno
# Ensure logs are propagated
assert {(record.message, record.levelno) for record in caplog.records} == set(
_EXPECTED_MESSAGES_AND_LEVELS[-expected_log_count:]
)


@respx.mock
Expand All @@ -278,10 +284,9 @@ async def test_actor_call_redirect_logs_to_default_logger_async(
assert isinstance(logger.handlers[0], logging.StreamHandler)

# Ensure logs are propagated
assert len(caplog.records) == len(_EXPECTED_MESSAGES_AND_LEVELS_WITH_STATUS_MESSAGES)
for expected_message_and_level, record in zip(_EXPECTED_MESSAGES_AND_LEVELS_WITH_STATUS_MESSAGES, caplog.records):
assert expected_message_and_level[0] == record.message
assert expected_message_and_level[1] == record.levelno
assert {(record.message, record.levelno) for record in caplog.records} == set(
_EXPECTED_MESSAGES_AND_LEVELS_WITH_STATUS_MESSAGES
)


@respx.mock
Expand All @@ -306,10 +311,9 @@ def test_actor_call_redirect_logs_to_default_logger_sync(
assert isinstance(logger.handlers[0], logging.StreamHandler)

# Ensure logs are propagated
assert len(caplog.records) == len(_EXPECTED_MESSAGES_AND_LEVELS_WITH_STATUS_MESSAGES)
for expected_message_and_level, record in zip(_EXPECTED_MESSAGES_AND_LEVELS_WITH_STATUS_MESSAGES, caplog.records):
assert expected_message_and_level[0] == record.message
assert expected_message_and_level[1] == record.levelno
assert {(record.message, record.levelno) for record in caplog.records} == set(
_EXPECTED_MESSAGES_AND_LEVELS_WITH_STATUS_MESSAGES
)


@respx.mock
Expand Down Expand Up @@ -357,10 +361,10 @@ async def test_actor_call_redirect_logs_to_custom_logger_async(
with caplog.at_level(logging.DEBUG, logger=logger_name):
await actor_client.call(logger=logger)

assert len(caplog.records) == len(_EXPECTED_MESSAGES_AND_LEVELS_WITH_STATUS_MESSAGES)
for expected_message_and_level, record in zip(_EXPECTED_MESSAGES_AND_LEVELS_WITH_STATUS_MESSAGES, caplog.records):
assert expected_message_and_level[0] == record.message
assert expected_message_and_level[1] == record.levelno
# Ensure logs are propagated
assert {(record.message, record.levelno) for record in caplog.records} == set(
_EXPECTED_MESSAGES_AND_LEVELS_WITH_STATUS_MESSAGES
)


@respx.mock
Expand All @@ -378,10 +382,10 @@ def test_actor_call_redirect_logs_to_custom_logger_sync(
with caplog.at_level(logging.DEBUG, logger=logger_name):
actor_client.call(logger=logger)

assert len(caplog.records) == len(_EXPECTED_MESSAGES_AND_LEVELS_WITH_STATUS_MESSAGES)
for expected_message_and_level, record in zip(_EXPECTED_MESSAGES_AND_LEVELS_WITH_STATUS_MESSAGES, caplog.records):
assert expected_message_and_level[0] == record.message
assert expected_message_and_level[1] == record.levelno
# Ensure logs are propagated
assert {(record.message, record.levelno) for record in caplog.records} == set(
_EXPECTED_MESSAGES_AND_LEVELS_WITH_STATUS_MESSAGES
)


@respx.mock
Expand Down