Skip to content

[WIP] Add waitSignals. #43

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 2, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 76 additions & 5 deletions pytestqt/_tests/test_wait_signal.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
class Signaller(QtCore.QObject):

signal = Signal()
signal_2 = Signal()


def test_signal_blocker_exception(qtbot):
Expand All @@ -18,21 +19,23 @@ def test_signal_blocker_exception(qtbot):
qtbot.waitSignal(None, None).wait()


def explicit_wait(qtbot, signal, timeout):
def explicit_wait(qtbot, signal, timeout, multiple):
"""
Explicit wait for the signal using blocker API.
"""
blocker = qtbot.waitSignal(signal, timeout)
func = qtbot.waitSignals if multiple else qtbot.waitSignal
blocker = func(signal, timeout)
assert not blocker.signal_triggered
blocker.wait()
return blocker


def context_manager_wait(qtbot, signal, timeout):
def context_manager_wait(qtbot, signal, timeout, multiple):
"""
Waiting for signal using context manager API.
"""
with qtbot.waitSignal(signal, timeout) as blocker:
func = qtbot.waitSignals if multiple else qtbot.waitSignal
with func(signal, timeout) as blocker:
pass
return blocker

Expand All @@ -55,14 +58,15 @@ def test_signal_triggered(qtbot, wait_function, emit_delay, timeout,
the expected results.
"""
signaller = Signaller()

timer = QtCore.QTimer()
timer.setSingleShot(True)
timer.timeout.connect(signaller.signal.emit)
timer.start(emit_delay)

# block signal until either signal is emitted or timeout is reached
start_time = time.time()
blocker = wait_function(qtbot, signaller.signal, timeout)
blocker = wait_function(qtbot, signaller.signal, timeout, multiple=False)

# Check that event loop exited.
assert not blocker._loop.isRunning()
Expand All @@ -78,6 +82,60 @@ def test_signal_triggered(qtbot, wait_function, emit_delay, timeout,
assert time.time() - start_time < (max_wait_ms / 1000.0)


@pytest.mark.parametrize(
('wait_function', 'emit_delay_1', 'emit_delay_2', 'timeout',
'expected_signal_triggered'),
[
(explicit_wait, 500, 600, 2000, True),
(explicit_wait, 500, 600, None, True),
(context_manager_wait, 500, 600, 2000, True),
(context_manager_wait, 500, 600, None, True),
(explicit_wait, 2000, 2000, 500, False),
(explicit_wait, 500, 2000, 1000, False),
(explicit_wait, 2000, 500, 1000, False),
(context_manager_wait, 2000, 2000, 500, False),
(context_manager_wait, 500, 2000, 1000, False),
(context_manager_wait, 2000, 500, 1000, False),
]
)
def test_signal_triggered_multiple(qtbot, wait_function, emit_delay_1,
emit_delay_2, timeout,
expected_signal_triggered):
"""
Testing for a signal in different conditions, ensuring we are obtaining
the expected results.
"""
signaller = Signaller()

timer = QtCore.QTimer()
timer.setSingleShot(True)
timer.timeout.connect(signaller.signal.emit)
timer.start(emit_delay_1)

timer2 = QtCore.QTimer()
timer2.setSingleShot(True)
timer2.timeout.connect(signaller.signal_2.emit)
timer2.start(emit_delay_2)

# block signal until either signal is emitted or timeout is reached
start_time = time.time()
blocker = wait_function(qtbot, [signaller.signal, signaller.signal_2],
timeout, multiple=True)

# Check that event loop exited.
assert not blocker._loop.isRunning()

# ensure that either signal was triggered or timeout occurred
assert blocker.signal_triggered == expected_signal_triggered

# Check that we exited by the earliest parameter; timeout = None means
# wait forever, so ensure we waited at most 4 times emit-delay
if timeout is None:
timeout = max(emit_delay_1, emit_delay_2) * 4
max_wait_ms = max(emit_delay_1, emit_delay_2, timeout)
assert time.time() - start_time < (max_wait_ms / 1000.0)


def test_explicit_emit(qtbot):
"""
Make sure an explicit emit() inside a waitSignal block works.
Expand All @@ -87,3 +145,16 @@ def test_explicit_emit(qtbot):
signaller.signal.emit()

assert waiting.signal_triggered


def test_explicit_emit_multiple(qtbot):
"""
Make sure an explicit emit() inside a waitSignal block works.
"""
signaller = Signaller()
with qtbot.waitSignals([signaller.signal, signaller.signal_2],
timeout=5000) as waiting:
signaller.signal.emit()
signaller.signal_2.emit()

assert waiting.signal_triggered
130 changes: 115 additions & 15 deletions pytestqt/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ class QtBot(object):
**Signals**

.. automethod:: waitSignal
.. automethod:: waitSignals

**Raw QTest API**

Expand Down Expand Up @@ -236,7 +237,7 @@ def stopForInteraction(self):
def waitSignal(self, signal=None, timeout=1000):
"""
.. versionadded:: 1.2

Stops current test until a signal is triggered.

Used to stop the control flow of a test until a signal is emitted, or
Expand Down Expand Up @@ -273,26 +274,74 @@ def waitSignal(self, signal=None, timeout=1000):

wait_signal = waitSignal # pep-8 alias

def waitSignals(self, signals=None, timeout=1000):
"""
.. versionadded:: 1.4

Stops current test until all given signals are triggered.

Used to stop the control flow of a test until all given signals are
emitted, or a number of milliseconds, specified by ``timeout``, has
elapsed.

Best used as a context manager::

with qtbot.waitSignals(signal, timeout=1000):
long_function_that_calls_signal()

Also, you can use the :class:`MultiSignalBlocker` directly if the
context manager form is not convenient::

class SignalBlocker(object):
blocker = qtbot.waitSignals(signal, timeout=1000)
blocker.connect(other_signal)
long_function_that_calls_signal()
blocker.wait()

:param list signals:
A list of :class:`Signal`s to wait for. Set to ``None`` to just use
timeout.
:param int timeout:
How many milliseconds to wait before resuming control flow.
:returns:
``MultiSignalBlocker`` object. Call ``MultiSignalBlocker.wait()``
to wait.

.. note::
Cannot have both ``signals`` and ``timeout`` equal ``None``, or
else you will block indefinitely. We throw an error if this occurs.
"""
blocker = MultiSignalBlocker(timeout=timeout)
if signals is not None:
for signal in signals:
blocker.add_signal(signal)
return blocker

wait_signals = waitSignals # pep-8 alias


class AbstractSignalBlocker(object):

"""
Returned by :meth:`QtBot.waitSignal` method.
Base class for :class:`SignalBlocker` and :class:`MultiSignalBlocker`.

.. automethod:: wait
.. automethod:: connect
Provides :meth:`wait` and a context manager protocol, but no means to add
new signals and to detect when the signals should be considered "done".
This needs to be implemented by subclasses.

Subclasses also need to provide ``self._signals`` which should evaluate to
``False`` if no signals were configured.

:ivar int timeout: maximum time to wait for a signal to be triggered. Can
be changed before :meth:`wait` is called.

:ivar bool signal_triggered: set to ``True`` if a signal was triggered, or
``False`` if timeout was reached instead. Until :meth:`wait` is called,
this is set to ``None``.
:ivar bool signal_triggered: set to ``True`` if a signal (or all signals in
case of :class:MultipleSignalBlocker:) was triggered, or ``False`` if
timeout was reached instead. Until :meth:`wait` is called, this is set
to ``None``.
"""

def __init__(self, timeout=1000):
self._loop = QtCore.QEventLoop()
self._signals = []
self.timeout = timeout
self.signal_triggered = False

Expand All @@ -305,12 +354,33 @@ def wait(self):
"""
if self.signal_triggered:
return
if self.timeout is None and len(self._signals) == 0:
if self.timeout is None and not self._signals:
raise ValueError("No signals or timeout specified.")
if self.timeout is not None:
QtCore.QTimer.singleShot(self.timeout, self._loop.quit)
self._loop.exec_()

def __enter__(self):
return self

def __exit__(self, type, value, traceback):
self.wait()


class SignalBlocker(AbstractSignalBlocker):

"""
Returned by :meth:`QtBot.waitSignal` method.

.. automethod:: wait
.. automethod:: connect

"""

def __init__(self, timeout=1000):
super(SignalBlocker, self).__init__(timeout)
self._signals = []

def connect(self, signal):
"""
Connects to the given signal, making :meth:`wait()` return once this signal
Expand All @@ -328,11 +398,41 @@ def _quit_loop_by_signal(self):
self.signal_triggered = True
self._loop.quit()

def __enter__(self):
return self

def __exit__(self, type, value, traceback):
self.wait()
class MultiSignalBlocker(AbstractSignalBlocker):

"""
Returned by :meth:`QtBot.waitSignals` method.

.. automethod:: wait
.. automethod:: add_signal
"""

def __init__(self, timeout=1000):
super(MultiSignalBlocker, self).__init__(timeout)
self._signals = {}

def add_signal(self, signal):
"""
Adds the given signal to the list of signals which :meth:`wait()` waits
for.

:param signal: QtCore.Signal
"""
self._signals[signal] = False
signal.connect(functools.partial(self._signal_emitted, signal))

def _signal_emitted(self, signal):
"""
Called when a given signal is emitted.

If all expected signals have been emitted, quits the event loop and
marks that we finished because signals.
"""
self._signals[signal] = True
if all(self._signals.values()):
self.signal_triggered = True
self._loop.quit()


@contextmanager
Expand Down Expand Up @@ -435,4 +535,4 @@ def pytest_configure(config):


def pytest_report_header():
return ['qt-api: %s' % QT_API]
return ['qt-api: %s' % QT_API]