-
Notifications
You must be signed in to change notification settings - Fork 140
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improve pipeline stop logic to ensure join is called exactly once for all stages #1479
Improve pipeline stop logic to ensure join is called exactly once for all stages #1479
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think your PR makes the fixes specified in the issue but looking at the code in here, I think we can do more to improve how the pipeline state is handled. I have the following recommendations:
- Convert the
_is_built
,_is_started
and_is_stopped
flags into a single member which holds onto the state enum.- An enum works well for the state because the pipeline state should only progress forwards
- The enum values should look something like: initialized, built, running, stopping, completed
- Changes to the state value should be guarded by a mutex to prevent changing the state from multiple threads
- We should move all of the logic in the
join()
method into_start()
after the call toself._mrc_executor.start()
- This is necessary because there are functions in
join()
which need to be called 100% of the time. However, its not required by the user to callpipeline.join()
. Its its possible with the current API that stages will never have theirstop()
andjoin()
method called. - To fix this, we just need to introduce an
asyncio.Event
into the pipeline - If you follow the example in the docs, we pretty much want to use the same pattern.
- After
self._mrc_executor.start()
is called, create a new task which immediately callsself._mrc_executor.join_async()
. This will block the task until the pipeline is complete. - After
join_async()
we should have all of the same code which is currently inPipeline.join()
to loop over all stages callingjoin()
. - Finally, the task should set the pipeline state to Complete and call
set()
on the event object - All that should remain in
Pipeline.join()
is callingawait self._completion_event.wait()
which will block that method from returning until the pipeline finishes.
- After
- This is necessary because there are functions in
- We should change the meaning of
stop()
and the meaning ofjoin()
for stagesstop()
should only get called 0 or 1 times. The only way it should get called is ifpipeline.stop()
was called indicating the pipeline should try to shut down gracefully.- Users should only implement this method if they have a source stage (or sources in their stage)
join()
should get called exactly 1 time. It should get called when the pipeline is complete and all stages are shut down. This is where users should implement any cleanup code
- We should store the order that stages were built into a list and use this list when iterating over all the stages in
stop()
andjoin()
- This is a small change but will guarantee that stages are stopped and joined in the same order they are built.
- We should add edge condition tests for handling all of these scenarios with the pipeline.
- We have a few checks but more robust tests would be very powerful here.
…into pipeline-stop-fix
…into pipeline-stop-fix
…into pipeline-stop-fix
…into pipeline-stop-fix
…rpheus into pipeline-stop-fix
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just missing tests on the join()
method. Do the same tests for both normal and out of order uses.
…into pipeline-stop-fix
…into pipeline-stop-fix
…into pipeline-stop-fix
/merge |
Description
_is_built
,_is_started
and_is_stopped
flags and replaces with single member which holds onto the state enum for: INITIALIZED, BUILT, STARTED, STOPPED, COMPLETEDstop()
and the meaning ofjoin()
for stagesstop()
called 0 or 1 times. Only way it can get called is ifpipeline.stop()
was called indicating the pipeline should try to shut down gracefully.join()
called exactly 1 time. Only called when the pipeline is complete and all stages are shut down. This is where users should implement any cleanup codeCloses #1477
By Submitting this PR I confirm: