Skip to content
This repository has been archived by the owner on Jul 19, 2021. It is now read-only.

Add ThreadPoolExecutor to workflowrunner #806

Merged
merged 1 commit into from
Jan 13, 2020

Conversation

xjules
Copy link
Contributor

@xjules xjules commented Nov 20, 2019

Issue
Resolves #802

Approach
Add ThreadPoolExecutor as an attribute to workflowrunner.
Exception is provided workflowrunner.exception()

@xjules xjules self-assigned this Nov 20, 2019
@xjules xjules changed the title Add workflow thread and join correspondingly WIP: Add workflow thread and join correspondingly Nov 20, 2019
@xjules xjules changed the title WIP: Add workflow thread and join correspondingly Add workflow thread and join correspondingly Nov 26, 2019

def run(self):
try:
if self._target:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this not be done by super instead? I.e. super(WorkflowRunnerThread, self).run(self)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's discuss this - need to know an opinion on this one

super(WorkflowRunnerThread, self).__init__(target=target, name=name)
self.setDaemon(True)
self._exception = None
# in python2.7 it's called __target therefore we need to inject this too
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this comment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The difference of Python2.7 vs Python3.3+ is thread.__target vs thread._target; therefore we set an attribute thread._target (which is tackled by init for Python3.3+)

@markusdregi
Copy link
Collaborator

markusdregi commented Nov 27, 2019

I still don't see how this PR is resolving the given issue. It solves the problem in the tests, but that is more by changing the test, then adopting the code at large (which I find a bit scary).

I'll have try to have a closer look at the usage of the WorkflowRunner and come back to you with something more concrete ;)

self.__workflow.wait()
if self.runner_thread:
self.runner_thread.join()
print('DBG wait finnished with following exception', self.runner_thread._exception)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens if not self.runner_thread ? from what i understand your print will fail when calling _exception on it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, should be inside the if statement

Copy link
Contributor

@ManInFez ManInFez left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good to me. It fixes the potential leaking of threads during testing. We should look into how Ert is using this and protect the execution in a similar manner.

Copy link
Collaborator

@markusdregi markusdregi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to have another discussion regarding this PR before we merge it when Julius is back in the office :)

@xjules xjules force-pushed the workflow_runner_thread branch 2 times, most recently from cb54f89 to cc664d8 Compare December 17, 2019 12:02
@xjules xjules changed the title Add workflow thread and join correspondingly Add ThreadPoolExecutor to workflowrunner Dec 17, 2019
Copy link
Collaborator

@markusdregi markusdregi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Beautiful 👏 Can you evaluate the usage in ERT and make an issue for changing the usage of WorkflowRunner in ERT if that should be done?

workflow_thread.setDaemon(True)
workflow_thread.run = self.__runWorkflow
workflow_thread.start()
#should we check if it's already running?
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If that is intended usage then please go ahead ;)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should remove the comment and make a decision as to whether you want to check this or not ;) I would lean towards the later

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment removed!

@@ -20,12 +20,20 @@ def __init__(self, workflow, ert=None, context=None):

self.__context = context
self.__workflow_result = None
self.__workflow_executor = ThreadPoolExecutor(max_workers=1)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Double under scored prefix is reserved for Python. Please use single underscore instead 🙏

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, shall I change the existing ones too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A new issue spawned for existing ones #861

from res.job_queue import Workflow
from res.util.substitution_list import SubstitutionList

from concurrent.futures import ThreadPoolExecutor
from concurrent import futures
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need both of these imports?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, will be gone ... soon :)

@markusdregi
Copy link
Collaborator

Do we need to update the requirements file and the Komodo dependencies of libres?

@ertomatic
Copy link

Can one of the admins verify this patch?

@xjules
Copy link
Contributor Author

xjules commented Jan 6, 2020

Do we need to update the requirements file and the Komodo dependencies of libres?

No, we shouldn't.

@markusdregi
Copy link
Collaborator

Do we need to update the requirements file and the Komodo dependencies of libres?

No, we shouldn't.

Are you sure? I don't think futures comes with Python 2.7...

@xjules
Copy link
Contributor Author

xjules commented Jan 6, 2020

Do we need to update the requirements file and the Komodo dependencies of libres?

No, we shouldn't.

Are you sure? I don't think futures comes with Python 2.7...

That agrees, but it's included in libecl ... but can make it explicit in libres too. That relates to future no futures
'futures` included in libres too now. The issue agronholm/pythonfutures#29 only relates to spawning new processes, where in our case we spawn only new threads within the same process, so we are good to go!

workflow_thread.setDaemon(True)
workflow_thread.run = self.__runWorkflow
workflow_thread.start()
self._workflow_job = self._workflow_executor.submit(self.__runWorkflow)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we not leaking a thread now if we call run twice and then cancel arbitrary many times? If so, I would say that we should raise an Exception if run is called twice and it is clear misuse of the API.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The thread should not leak as it is the part of ThreadPool, i.e., just the job submitted twice - of-course it shouldn't be done for different reasons, so yes we can raise an exception here.

workflow_thread.run = self.__runWorkflow
workflow_thread.start()
if self.isRunning():
raise Exception('An instance of workflow is already running!')
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Never raw exceptions, perhaps an AssertionError?

Copy link
Contributor Author

@xjules xjules Jan 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok. Was browsing around in libres and found raw Exception still ....

- ThreadPoolExecutor replacing Thread in workflow runner
- mock worflow.run to throw exception
- create ThreadPoolExecutor in __init__
- replacing underscores and clean up
- add futures to requirements
- raise AssertionError if workflow is running when starting
Copy link
Collaborator

@markusdregi markusdregi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good job 👍

Can you create a new tag on master listing that a new dependency is added after this is merged and add this to the dependency list for master and the new tag in komodo-releases?

@xjules xjules merged commit 33aa63b into equinor:master Jan 13, 2020
@xjules xjules deleted the workflow_runner_thread branch January 13, 2020 14:18
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Prevent leaking threads in workflow runner
5 participants