-
Notifications
You must be signed in to change notification settings - Fork 44
Add ThreadPoolExecutor to workflowrunner #806
Conversation
967748f
to
42a4a57
Compare
|
||
def run(self): | ||
try: | ||
if self._target: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this not be done by super
instead? I.e. super(WorkflowRunnerThread, self).run(self)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's discuss this - need to know an opinion on this one
super(WorkflowRunnerThread, self).__init__(target=target, name=name) | ||
self.setDaemon(True) | ||
self._exception = None | ||
# in python2.7 it's called __target therefore we need to inject this too |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand this comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The difference of Python2.7 vs Python3.3+ is thread.__target
vs thread._target
; therefore we set an attribute thread._target
(which is tackled by init for Python3.3+)
I still don't see how this PR is resolving the given issue. It solves the problem in the tests, but that is more by changing the test, then adopting the code at large (which I find a bit scary). I'll have try to have a closer look at the usage of the WorkflowRunner and come back to you with something more concrete ;) |
self.__workflow.wait() | ||
if self.runner_thread: | ||
self.runner_thread.join() | ||
print('DBG wait finnished with following exception', self.runner_thread._exception) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what happens if not self.runner_thread ? from what i understand your print will fail when calling _exception on it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, should be inside the if statement
d26b3a5
to
765a198
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good to me. It fixes the potential leaking of threads during testing. We should look into how Ert is using this and protect the execution in a similar manner.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would like to have another discussion regarding this PR before we merge it when Julius is back in the office :)
cb54f89
to
cc664d8
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Beautiful 👏 Can you evaluate the usage in ERT and make an issue for changing the usage of WorkflowRunner
in ERT if that should be done?
workflow_thread.setDaemon(True) | ||
workflow_thread.run = self.__runWorkflow | ||
workflow_thread.start() | ||
#should we check if it's already running? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If that is intended usage then please go ahead ;)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should remove the comment and make a decision as to whether you want to check this or not ;) I would lean towards the later
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment removed!
@@ -20,12 +20,20 @@ def __init__(self, workflow, ert=None, context=None): | |||
|
|||
self.__context = context | |||
self.__workflow_result = None | |||
self.__workflow_executor = ThreadPoolExecutor(max_workers=1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Double under scored prefix is reserved for Python. Please use single underscore instead 🙏
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, shall I change the existing ones too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A new issue spawned for existing ones #861
from res.job_queue import Workflow | ||
from res.util.substitution_list import SubstitutionList | ||
|
||
from concurrent.futures import ThreadPoolExecutor | ||
from concurrent import futures |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need both of these imports?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, will be gone ... soon :)
Do we need to update the requirements file and the Komodo dependencies of libres? |
Can one of the admins verify this patch? |
No, we shouldn't. |
cc664d8
to
4e41c22
Compare
Are you sure? I don't think |
|
4e41c22
to
67737f7
Compare
f5aed36
to
e0416e3
Compare
workflow_thread.setDaemon(True) | ||
workflow_thread.run = self.__runWorkflow | ||
workflow_thread.start() | ||
self._workflow_job = self._workflow_executor.submit(self.__runWorkflow) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we not leaking a thread now if we call run twice and then cancel arbitrary many times? If so, I would say that we should raise an Exception if run
is called twice and it is clear misuse of the API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The thread should not leak as it is the part of ThreadPool, i.e., just the job submitted twice - of-course it shouldn't be done for different reasons, so yes we can raise an exception here.
e0416e3
to
d33ad72
Compare
workflow_thread.run = self.__runWorkflow | ||
workflow_thread.start() | ||
if self.isRunning(): | ||
raise Exception('An instance of workflow is already running!') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Never raw exceptions, perhaps an AssertionError
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok. Was browsing around in libres and found raw Exception still ....
- ThreadPoolExecutor replacing Thread in workflow runner - mock worflow.run to throw exception - create ThreadPoolExecutor in __init__ - replacing underscores and clean up - add futures to requirements - raise AssertionError if workflow is running when starting
d33ad72
to
ac8b37b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good job 👍
Can you create a new tag on master listing that a new dependency is added after this is merged and add this to the dependency list for master and the new tag in komodo-releases?
Issue
Resolves #802
Approach
Add ThreadPoolExecutor as an attribute to workflowrunner.
Exception is provided workflowrunner.exception()