Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

"In the face of ambiguity refuse the temptation to guess" -- Tim Peters #29

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
Open
6 changes: 3 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ install:
# flake8 style checker
- pip install flake8 pep8-naming flake8-debugger flake8-docstrings
script:
- QUAMASH_QTIMPL=PySide py.test
- QUAMASH_QTIMPL=PyQt4 py.test
- QUAMASH_QTIMPL=PyQt5 py.test
- py.test --qtimpl PySide
- py.test --qtimpl PyQt4
- py.test --qtimpl PyQt5
- flake8 --ignore=D1,W191,E501
- flake8 --select=D1 quamash/*.py
cache:
Expand Down
2 changes: 1 addition & 1 deletion appveyor.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ install:
build: off

test_script:
- "%PYTHON%\\Scripts\\py.test.exe"
- "%PYTHON%\\Scripts\\py.test.exe --qtimpl %QTIMPL%"

notifications:
- provider: Webhook
Expand Down
39 changes: 36 additions & 3 deletions conftest.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import sys
import os.path
import logging
from importlib import import_module
from pytest import fixture
sys.path.insert(0, os.path.dirname(__file__))
logging.basicConfig(
Expand All @@ -12,7 +13,39 @@
collect_ignore = ['quamash/_windows.py']


def pytest_addoption(parser):
parser.addoption("--qtimpl", default='guess')


def guess_qtimpl():
for guess in ('PyQt5', 'PyQt4', 'PySide'):
try:
__import__(guess)
except ImportError:
continue
else:
return guess


@fixture(scope='session')
def application():
from quamash import QApplication
return QApplication([])
def application(request):
qtimpl = request.config.getoption('qtimpl')
if qtimpl == 'guess':
qtimpl = guess_qtimpl()
__import__(qtimpl)

for module in ('.QtWidgets', '.QtGui'):
try:
return import_module(module, qtimpl).QApplication([])
except (ImportError, AttributeError):
continue


@fixture(scope='session')
def qtcore(request):
qtimpl = request.config.getoption('qtimpl')
if qtimpl == 'guess':
qtimpl = guess_qtimpl()
__import__(qtimpl)

return import_module('.QtCore', qtimpl)
216 changes: 74 additions & 142 deletions quamash/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,117 +12,82 @@
import os
import asyncio
import time
from functools import wraps
import itertools
from queue import Queue
from concurrent.futures import Future
import logging
logger = logging.getLogger('quamash')

try:
QtModuleName = os.environ['QUAMASH_QTIMPL']
except KeyError:
QtModule = None
else:
logger.info('Forcing use of {} as Qt Implementation'.format(QtModuleName))
QtModule = __import__(QtModuleName)

if not QtModule:
for QtModuleName in ('PyQt5', 'PyQt4', 'PySide'):
try:
QtModule = __import__(QtModuleName)
except ImportError:
continue
else:
break
else:
raise ImportError('No Qt implementations found')

logger.info('Using Qt Implementation: {}'.format(QtModuleName))

QtCore = __import__(QtModuleName + '.QtCore', fromlist=(QtModuleName,))
QtGui = __import__(QtModuleName + '.QtGui', fromlist=(QtModuleName,))
if QtModuleName == 'PyQt5':
from PyQt5 import QtWidgets
QApplication = QtWidgets.QApplication
else:
QApplication = QtGui.QApplication

if not hasattr(QtCore, 'Signal'):
QtCore.Signal = QtCore.pyqtSignal

from importlib import import_module
import warnings

from ._common import with_logger

if 'QUAMASH_QTIMPL' in os.environ:
warnings.warn("QUAMASH_QTIMPL environment variable set, this version of quamash ignores it.")

@with_logger
class _QThreadWorker(QtCore.QThread):

"""
Read from the queue.

For use by the QThreadExecutor
"""

def __init__(self, queue, num):
self.__queue = queue
self.__stop = False
self.__num = num
super().__init__()
def _run_in_worker(queue, num, logger):
while True:
command = queue.get()
if command is None:
# Stopping...
break

def run(self):
queue = self.__queue
while True:
command = queue.get()
if command is None:
# Stopping...
break

future, callback, args, kwargs = command
self._logger.debug(
'#{} got callback {} with args {} and kwargs {} from queue'
.format(self.__num, callback, args, kwargs)
)
if future.set_running_or_notify_cancel():
self._logger.debug('Invoking callback')
try:
r = callback(*args, **kwargs)
except Exception as err:
self._logger.debug('Setting Future exception: {}'.format(err))
future.set_exception(err)
else:
self._logger.debug('Setting Future result: {}'.format(r))
future.set_result(r)
future, callback, args, kwargs = command
logger.debug(
'#{} got callback {} with args {} and kwargs {} from queue'
.format(num, callback, args, kwargs)
)
if future.set_running_or_notify_cancel():
logger.debug('Invoking callback')
try:
r = callback(*args, **kwargs)
except Exception as err:
logger.debug('Setting Future exception: {}'.format(err))
future.set_exception(err)
else:
self._logger.debug('Future was canceled')

self._logger.debug('Thread #{} stopped'.format(self.__num))
logger.debug('Setting Future result: {}'.format(r))
future.set_result(r)
else:
logger.debug('Future was canceled')

def wait(self):
self._logger.debug('Waiting for thread #{} to stop...'.format(self.__num))
super().wait()
logger.debug('Thread #{} stopped'.format(num))


@with_logger
class QThreadExecutor(QtCore.QObject):
class QThreadExecutor:

"""
ThreadExecutor that produces QThreads.

Same API as `concurrent.futures.Executor`

>>> from quamash import QThreadExecutor
>>> with QThreadExecutor(5) as executor:
>>> QtCore = getfixture('qtcore')
>>> with QThreadExecutor(QtCore, 5) as executor:
... f = executor.submit(lambda x: 2 + x, 2)
... r = f.result()
... assert r == 4
"""

def __init__(self, max_workers=10, parent=None):
super().__init__(parent)
def __init__(self, qtcore, max_workers=10):
super().__init__()

@with_logger
class QThreadWorker(qtcore.QThread):
def __init__(self, queue, num):
super().__init__()
self.__queue = queue
self.__num = num

def run(self):
_run_in_worker(self.__queue, self.__num, self._logger)

def wait(self):
self._logger.debug('Waiting for thread #{} to stop...'.format(self.__num))
super().wait()

self.__max_workers = max_workers
self.__queue = Queue()
self.__workers = [_QThreadWorker(self.__queue, i + 1) for i in range(max_workers)]
self.__workers = [QThreadWorker(self.__queue, i + 1) for i in range(max_workers)]
self.__been_shutdown = False

for w in self.__workers:
Expand Down Expand Up @@ -165,56 +130,16 @@ def __exit__(self, *args):
self.shutdown()


def _easycallback(fn):
"""
Decorator that wraps a callback in a signal.

It also packs & unpacks arguments, and makes the wrapped function effectively
threadsafe. If you call the function from one thread, it will be executed in
the thread the QObject has affinity with.

Remember: only objects that inherit from QObject can support signals/slots

>>> import asyncio
>>>
>>> import quamash
>>> QThread, QObject = quamash.QtCore.QThread, quamash.QtCore.QObject
>>>
>>> app = getfixture('application')
>>>
>>> global_thread = QThread.currentThread()
>>> class MyObject(QObject):
... @_easycallback
... def mycallback(self):
... global global_thread, mythread
... cur_thread = QThread.currentThread()
... assert cur_thread is not global_thread
... assert cur_thread is mythread
>>>
>>> mythread = QThread()
>>> mythread.start()
>>> myobject = MyObject()
>>> myobject.moveToThread(mythread)
>>>
>>> @asyncio.coroutine
... def mycoroutine():
... myobject.mycallback()
>>>
>>> loop = QEventLoop(app)
>>> asyncio.set_event_loop(loop)
>>> with loop:
... loop.run_until_complete(mycoroutine())
"""
@wraps(fn)
def in_wrapper(self, *args, **kwargs):
return signaler.signal.emit(self, args, kwargs)
def _make_signaller(qtimpl_qtcore, *args):
try:
signal_class = qtimpl_qtcore.Signal
except AttributeError:
signal_class = qtimpl_qtcore.pyqtSignal

class Signaler(QtCore.QObject):
signal = QtCore.Signal(object, tuple, dict)
class Signaller(qtimpl_qtcore.QObject):
signal = signal_class(*args)

signaler = Signaler()
signaler.signal.connect(lambda self, args, kwargs: fn(self, *args, **kwargs))
return in_wrapper
return Signaller()


if os.name == 'nt':
Expand All @@ -231,6 +156,9 @@ class QEventLoop(_baseclass):
"""
Implementation of asyncio event loop that uses the Qt Event loop.

Parameters:
:app: Any instance of QApplication

>>> import asyncio
>>>
>>> app = getfixture('application')
Expand All @@ -247,20 +175,25 @@ class QEventLoop(_baseclass):
... loop.run_until_complete(xplusy(2, 2))
"""

def __init__(self, app=None):
def __init__(self, app):
self.__timers = []
self.__app = app or QApplication.instance()
assert self.__app is not None, 'No QApplication has been instantiated'
if app is None:
raise ValueError("app must be an instance of QApplication")
self.__app = app
self.__is_running = False
self.__debug_enabled = False
self.__default_executor = None
self.__exception_handler = None
self._read_notifiers = {}
self._write_notifiers = {}

assert self.__app is not None
qtcore = import_module('..QtCore', type(app).__module__)

super().__init__()
super().__init__(qtcore)

self.__call_soon_signaller = signaller = _make_signaller(self._qtcore, object, tuple)
self.__call_soon_signal = signaller.signal
signaller.signal.connect(lambda callback, args: self.call_soon(callback, *args))

def run_forever(self):
"""Run eventloop forever."""
Expand Down Expand Up @@ -352,7 +285,7 @@ def upon_timeout():
handle._run()

self._logger.debug('Adding callback {} with delay {}'.format(handle, delay))
timer = QtCore.QTimer(self.__app)
timer = self._qtcore.QTimer(self.__app)
timer.timeout.connect(upon_timeout)
timer.setSingleShot(True)
timer.start(delay * 1000)
Expand Down Expand Up @@ -384,7 +317,7 @@ def add_reader(self, fd, callback, *args):
existing.activated.disconnect()
# will get overwritten by the assignment below anyways

notifier = QtCore.QSocketNotifier(fd, QtCore.QSocketNotifier.Read)
notifier = self._qtcore.QSocketNotifier(fd, self._qtcore.QSocketNotifier.Read)
notifier.setEnabled(True)
self._logger.debug('Adding reader callback for file descriptor {}'.format(fd))
notifier.activated.connect(
Expand Down Expand Up @@ -416,7 +349,7 @@ def add_writer(self, fd, callback, *args):
existing.activated.disconnect()
# will get overwritten by the assignment below anyways

notifier = QtCore.QSocketNotifier(fd, QtCore.QSocketNotifier.Write)
notifier = self._qtcore.QSocketNotifier(fd, self._qtcore.QSocketNotifier.Write)
notifier.setEnabled(True)
self._logger.debug('Adding writer callback for file descriptor {}'.format(fd))
notifier.activated.connect(
Expand Down Expand Up @@ -472,10 +405,9 @@ def __on_notifier_ready(self, notifiers, notifier, fd, callback, args):

# Methods for interacting with threads.

@_easycallback
def call_soon_threadsafe(self, callback, *args):
"""Thread-safe version of call_soon."""
self.call_soon(callback, *args)
self.__call_soon_signal.emit(callback, args)

def run_in_executor(self, executor, callback, *args):
"""Run callback in executor.
Expand All @@ -496,7 +428,7 @@ def run_in_executor(self, executor, callback, *args):
executor = executor or self.__default_executor
if executor is None:
self._logger.debug('Creating default executor')
executor = self.__default_executor = QThreadExecutor()
executor = self.__default_executor = QThreadExecutor(self._qtcore)
self._logger.debug('Using default executor')

return asyncio.wrap_future(executor.submit(callback, *args))
Expand Down
Loading