Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ci: add active Python versions to test matrix #37

Merged
merged 8 commits into from
Mar 7, 2022
Merged
Prev Previous commit
Next Next commit
chore: replace magic wait time with semaphore
  • Loading branch information
edeckers committed Mar 7, 2022
commit 6f6ed31a54f3e3e1b4a848b1d4c22c2fe10a3a8a
32 changes: 19 additions & 13 deletions src/tests/test_api_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import tempfile
import threading
import unittest
from time import sleep

from huemon.api.api import Api
from huemon.api.api_factory import create_api, create_hue_hub_url
Expand Down Expand Up @@ -36,29 +35,36 @@ def test_when_cache_disabled_return_regular(self):
self.assertIsInstance(api, Api)

@staticmethod
def __wait(timeout_seconds: int, value: dict):
sleep(timeout_seconds)
def __wait(mutex: threading.Lock, is_started: threading.Event):
is_started.set()

value["result"] = True
with mutex:
pass

def test_when_locked_return_none(self):
(lfd, lock_file_path) = tempfile.mkstemp()

thread_result = {"result": False}
target = lambda: run_locked(
lock_file_path, lambda: TestApiConfiguration.__wait(1, thread_result)
)
mutex = threading.Lock()

with mutex:
is_started_event = threading.Event()
target = lambda: run_locked(
lock_file_path,
lambda: TestApiConfiguration.__wait(mutex, is_started_event),
)

thread0 = threading.Thread(target=target)

thread0.start()
is_started_event.wait()

thread0 = threading.Thread(target=target)
maybe_true = run_locked(lock_file_path, lambda: True)

thread0.start()
maybe_true = run_locked(lock_file_path, lambda: True)
thread0.join()
thread0.join(5)

os.close(lfd)

self.assertIsNone(maybe_true)
self.assertEqual(True, thread_result["result"])

def test_when_locked_raises_exception_return_none(self):
(lfd, lock_file_path) = tempfile.mkstemp()
Expand Down