-
Notifications
You must be signed in to change notification settings - Fork 19.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Extending GeneratorEnqueuer to handle finite generators. #8104
Extending GeneratorEnqueuer to handle finite generators. #8104
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change will require a unit test.
keras/utils/data_utils.py
Outdated
@@ -536,11 +536,12 @@ def stop(self, timeout=None): | |||
|
|||
class GeneratorEnqueuer(SequenceEnqueuer): | |||
"""Builds a queue out of a data generator. | |||
The provided generator can be finite in which case the class will throw a StopIteration exception. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Introduce a line break after the first docstring line, and format the line you added into 2 shorter lines. Add code delimiters around
StopIteration`.
@fchollet Roger. I'll send an update next week. :) |
@fchollet I think it's ready from my part. Please have a look and let me know. I added two additional tests that cover the case of the finite generator (one with threads and one with processes). I also noticed that the PR is partially covered by the two test_generator_enqueuer_fail_* tests. Apologies for the multiple commits; I had a hard time with PEP8 policies as I don't have it installed on my system. I can squash the commits if you want, but I think you typically do this on merge. |
|
||
|
||
def test_finite_generator_enqueuer_processes(): | ||
enqueuer = GeneratorEnqueuer(create_finite_generator_from_sequence_pcs( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Dref360 Is not this tested directly in the for loop? The StopIterator causes the for loop to exit. Do you mean place a next() call after the end of the loop to show it throws an exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks
The GeneratorEnqueuer class is used by Keras to generate batches of data. Even though it's an internal class, it can be used for building fast training and prediction pipelines that do not use the *_generator() methods. One core assumption of the class is that the underlying generator must be able to produce infinite amount of data. This is useful when we perform training but can be limiting when we want to make predictions (especially when you combine Spark + Keras).
This PR extends GeneratorEnqueuer to handle finite generators and it was tested both with threads and processes. The main idea is that we catch the StopIteration which is raised when the underlying generator is exhausted and gracefully exit the thread. When we call get() on the enqueuer instead of sleeping directly when the queue is empty, we check if any of the threads are active. If all of them are inactive we raise a StopIteration exception, else if at least one thread is still running we sleep.
The changes should have no effect if the underlying iterator is infinite. When it is finite, we raise a StopIteration exception once the underlying generator is exhausted to inform the user of the class that the data have been processed.