Description
Problem
I came across the following situation today while writing a PySide test. I have a data analysis GUI that calls out to an external application in a new process (using QProcess) to run a simulation. I set out to write an integration test that runs the simulation, waits for it to complete, and then tests the output files.
I tried many things, including a lot of Python time
and threading
functions, but they all blocked the main thread, therefore blocking the GUI and blocking my call to the external process. I figured out that you can call QtCore.QEventLoop.exec_()
to create a nested event loop. That is, qtbot
has a QApplication
instance already running, but calling QtCore.QEventLoop.exec_()
will stop execution using a new event loop QtCore.QEventLoop.quit()
is called.
Here is an example test:
def test_run_simulation(qtbot):
gui = MainGUI()
qtbot.addWidget(gui)
# Set up loop and quit slot
loop = QtCore.QEventLoop()
gui.worker.finished.connect(loop.quit)
gui.worker.start() # Begin long operation
loop.exec_() # Execution stops here until signal is emitted
assert_valid_data(worker.results) # Worker is done. Check results.
...
My feature request is this: abstract this functionality and put it in pytest-qt
.
Solution
Here is one solution I found. First we define the context manager block_until_emit
:
@contextmanager
def block_until_emit(signal, timeout=10000):
"""Block loop until signal emitted, or timeout (ms) elapses."""
loop = QtCore.QEventLoop()
signal.connect(loop.quit)
yield
if timeout is not None:
QtCore.QTimer.singleShot(timeout, loop.quit)
loop.exec_()
This context manager can be used like this:
with block_until_emit(worker.finished, 10000):
gui.worker.start()
Implementation
I think we can add this as a method to qtbot
, so we use it as qtbot.block_until_emit
. We can also keep it as a top-level definition in the pyqtestqt
namespace and have users import it, since it doesn't need information from the current qtbot.
Also, an arbitrary number of signal can be passed in and all of them can be connected to loop.quit
. We can do something like this:
@contextmanager
def block_until_emit(signals, timeout=10000):
if not isinstance(signal, list):
signals = [signals]
for signal in signals:
signal.connect(loop.quit)
...
Also, users may not want to use this as a context manager. We could define a class with __enter__
and __exit__
defined that gets returned. Then, we could use this as a function too.
Lastly, users may not want to connect a signal. They may just want to use the timeout feature. In that case, we can make signals=None
by default and use a timer (making sure timeout
and signals
are not both None
).
Let me know what you think!